diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-18 14:20:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-18 14:20:33 +0000 |
commit | 8326fecaca6f864633c3e98fc15df6a081c112d5 (patch) | |
tree | f2c85e2bada34e3207f0ec915b2d9050ca998c5f | |
parent | e2170d47dfb8ab2e3685e284c6d2f8d0cea1bd33 (diff) | |
parent | fcee3e87e62f5c33fad955c0d1f8c616d4ea3cb5 (diff) | |
download | rabbitmq-server-8326fecaca6f864633c3e98fc15df6a081c112d5.tar.gz |
Merge in default
29 files changed, 856 insertions, 489 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index c0d6cc70..7d6b80a7 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -341,7 +341,7 @@ %% {reconnect_delay, 2.5} %% ]} %% End of my_first_shovel - ]}, + ]} %% Rather than specifying some values per-shovel, you can specify %% them for all shovels here. %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d19acd00..a7e42503 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -502,6 +502,23 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term> + <listitem> + <para> + Sets the cluster name. The cluster name is announced to + clients on connection, and used by the federation and + shovel plugins to record where a message has been. The + cluster name is by default derived from the hostname of + the first node in the cluster, but can be changed. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_cluster_name london</screen> + <para role="example"> + This sets the cluster name to "london". + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6f6f4244..19eef65a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -73,7 +73,7 @@ -record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). --record(event, {type, props, timestamp}). +-record(event, {type, props, reference = undefined, timestamp}). -record(message_properties, {expiry, needs_confirming = false}). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..ee82bcb3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -81,6 +81,14 @@ %% process as sys:get_status/1 would). Pass through a function which %% can be invoked on the state, get back the result. The state is not %% modified. +%% +%% 10) an mcall/1 function has been added for performing multiple +%% call/3 in parallel. Unlike multi_call, which sends the same request +%% to same-named processes residing on a supplied list of nodes, it +%% operates on name/request pairs, where name is anything accepted by +%% call/3, i.e. a pid, global name, local name, or local name on a +%% particular node. +%% %% All modifications are (C) 2009-2013 GoPivotal, Inc. @@ -190,6 +198,7 @@ cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, + mcall/1, with_state/2, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). @@ -389,6 +398,85 @@ multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). +%%% ----------------------------------------------------------------- +%%% Make multiple calls to multiple servers, given pairs of servers +%%% and messages. +%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} +%%% +%%% Dest can be pid() | RegName :: atom() | +%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} +%%% +%%% A middleman process is used to avoid clogging up the callers +%%% message queue. +%%% ----------------------------------------------------------------- +mcall(CallSpecs) -> + Tag = make_ref(), + {_, MRef} = spawn_monitor( + fun() -> + Refs = lists:foldl( + fun ({Dest, _Request}=S, Dict) -> + dict:store(do_mcall(S), Dest, Dict) + end, dict:new(), CallSpecs), + collect_replies(Tag, Refs, [], []) + end), + receive + {'DOWN', MRef, _, _, {Tag, Result}} -> Result; + {'DOWN', MRef, _, _, Reason} -> exit(Reason) + end. + +do_mcall({{global,Name}=Dest, Request}) -> + %% whereis_name is simply an ets lookup, and is precisely what + %% global:send/2 does, yet we need a Ref to put in the call to the + %% server, so invoking whereis_name makes a lot more sense here. + case global:whereis_name(Name) of + Pid when is_pid(Pid) -> + MRef = erlang:monitor(process, Pid), + catch msend(Pid, MRef, Request), + MRef; + undefined -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, Dest, noproc}, + Ref + end; +do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> + {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 + catch msend(Dest, MRef, Request), + MRef; +do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> + MRef = erlang:monitor(process, Dest), + catch msend(Dest, MRef, Request), + MRef. + +msend(Dest, MRef, Request) -> + erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). + +collect_replies(Tag, Refs, Replies, Errors) -> + case dict:size(Refs) of + 0 -> exit({Tag, {Replies, Errors}}); + _ -> receive + {MRef, Reply} -> + {Refs1, Replies1} = handle_call_result(MRef, Reply, + Refs, Replies), + collect_replies(Tag, Refs1, Replies1, Errors); + {'DOWN', MRef, _, _, Reason} -> + Reason1 = case Reason of + noconnection -> nodedown; + _ -> Reason + end, + {Refs1, Errors1} = handle_call_result(MRef, Reason1, + Refs, Errors), + collect_replies(Tag, Refs1, Replies, Errors1) + end + end. + +handle_call_result(MRef, Result, Refs, AccList) -> + %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} + %% here, so we must cope with MRefs that we've already seen and erased + case dict:find(MRef, Refs) of + {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; + _ -> {Refs, AccList} + end. + %% ----------------------------------------------------------------- %% Apply a function to a generic server's state. %% ----------------------------------------------------------------- diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index d5f51db0..ae35526f 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -57,10 +57,10 @@ %% This is basically the same as for supervisor, except that: %% %% 1) start_link(Module, Args) becomes -%% start_link(Group, Module, Args). +%% start_link(Group, TxFun, Module, Args). %% %% 2) start_link({local, Name}, Module, Args) becomes -%% start_link({local, Name}, Group, Module, Args). +%% start_link({local, Name}, Group, TxFun, Module, Args). %% %% 3) start_link({global, Name}, Module, Args) is not available. %% @@ -70,6 +70,19 @@ %% application should invoke create_tables() (or table_definitions() %% if it wants to manage table creation itself). %% +%% The TxFun parameter to start_link/{4,5} is a function which the +%% mirrored supervisor can use to execute Mnesia transactions. In the +%% RabbitMQ server this goes via a worker pool; in other cases a +%% function like: +%% +%% tx_fun(Fun) -> +%% case mnesia:sync_transaction(Fun) of +%% {atomic, Result} -> Result; +%% {aborted, Reason} -> throw({error, Reason}) +%% end. +%% +%% could be used. +%% %% Internals %% --------- %% @@ -115,7 +128,7 @@ {attributes, record_info(fields, mirrored_sup_childspec)}]}). -define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}). --export([start_link/3, start_link/4, +-export([start_link/4, start_link/5, start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, count_children/1, check_childspecs/1]). @@ -126,7 +139,7 @@ -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, handle_cast/2]). --export([start_internal/2]). +-export([start_internal/3]). -export([create_tables/0, table_definitions/0]). -record(mirrored_sup_childspec, {key, mirroring_pid, childspec}). @@ -134,6 +147,7 @@ -record(state, {overall, delegate, group, + tx_fun, initial_childspecs}). %%---------------------------------------------------------------------------- @@ -160,19 +174,25 @@ -type group_name() :: any(). --spec start_link(GroupName, Module, Args) -> startlink_ret() when +-type(tx_fun() :: fun((fun(() -> A)) -> A)). + +-spec start_link(GroupName, TxFun, Module, Args) -> startlink_ret() when GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when +-spec start_link(SupName, GroupName, TxFun, Module, Args) -> + startlink_ret() when SupName :: supervisor2:sup_name(), GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_internal(Group, ChildSpecs) -> Result when +-spec start_internal(Group, TxFun, ChildSpecs) -> Result when Group :: group_name(), + TxFun :: tx_fun(), ChildSpecs :: [supervisor2:child_spec()], Result :: {'ok', pid()} | {'error', term()}. @@ -190,18 +210,18 @@ behaviour_info(_Other) -> undefined. %%---------------------------------------------------------------------------- -start_link(Group, Mod, Args) -> - start_link0([], Group, init(Mod, Args)). +start_link(Group, TxFun, Mod, Args) -> + start_link0([], Group, TxFun, init(Mod, Args)). -start_link({local, SupName}, Group, Mod, Args) -> - start_link0([{local, SupName}], Group, init(Mod, Args)); +start_link({local, SupName}, Group, TxFun, Mod, Args) -> + start_link0([{local, SupName}], Group, TxFun, init(Mod, Args)); -start_link({global, _SupName}, _Group, _Mod, _Args) -> +start_link({global, _SupName}, _Group, _TxFun, _Mod, _Args) -> erlang:error(badarg). -start_link0(Prefix, Group, Init) -> +start_link0(Prefix, Group, TxFun, Init) -> case apply(?SUPERVISOR, start_link, - Prefix ++ [?MODULE, {overall, Group, Init}]) of + Prefix ++ [?MODULE, {overall, Group, TxFun, Init}]) of {ok, Pid} -> case catch call(Pid, {init, Pid}) of ok -> {ok, Pid}; E -> E @@ -225,7 +245,9 @@ count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). -cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg). +cast(Sup, Msg) -> with_exit_handler( + fun() -> ok end, + fun() -> ?GEN_SERVER:cast(mirroring(Sup), Msg) end). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -257,14 +279,14 @@ mirroring(Sup) -> child(Sup, mirroring). %%---------------------------------------------------------------------------- -start_internal(Group, ChildSpecs) -> - ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs}, +start_internal(Group, TxFun, ChildSpecs) -> + ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, TxFun, ChildSpecs}, [{timeout, infinity}]). %%---------------------------------------------------------------------------- -init({overall, _Group, ignore}) -> ignore; -init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> +init({overall, _Group, _TxFun, ignore}) -> ignore; +init({overall, Group, TxFun, {ok, {Restart, ChildSpecs}}}) -> %% Important: Delegate MUST start before Mirroring so that when we %% shut down from above it shuts down last, so Mirroring does not %% see it die. @@ -273,27 +295,30 @@ init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> {ok, {{one_for_all, 0, 1}, [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + {mirroring, {?MODULE, start_internal, [Group, TxFun, ChildSpecs]}, permanent, 16#ffffffff, worker, [?MODULE]}]}}; init({delegate, Restart}) -> {ok, {Restart, []}}; -init({mirroring, Group, ChildSpecs}) -> - {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. +init({mirroring, Group, TxFun, ChildSpecs}) -> + {ok, #state{group = Group, + tx_fun = TxFun, + initial_childspecs = ChildSpecs}}. handle_call({init, Overall}, _From, State = #state{overall = undefined, delegate = undefined, group = Group, + tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), ok = ?PG2:join(Group, Overall), Rest = ?PG2:get_members(Group) -- [Overall], case Rest of - [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); + [] -> TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -303,7 +328,8 @@ handle_call({init, Overall}, _From, Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, TxFun, Overall, Delegate, S) + || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -311,16 +337,18 @@ handle_call({init, Overall}, _From, handle_call({start_child, ChildSpec}, _From, State = #state{overall = Overall, delegate = Delegate, - group = Group}) -> - {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of + group = Group, + tx_fun = TxFun}) -> + {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, - group = Group}) -> - {reply, stop(Group, Delegate, Id), State}; + group = Group, + tx_fun = TxFun}) -> + {reply, stop(Group, TxFun, Delegate, Id), State}; handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; @@ -343,7 +371,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, Reason}, - State = #state{overall = Pid, group = Group}) -> + State = #state{delegate = Pid, group = Group}) -> %% Since the delegate is temporary, its death won't cause us to %% die. Since the overall supervisor kills processes in reverse %% order when shutting down "from above" and we started after the @@ -357,14 +385,15 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group, - overall = O}) -> + State = #state{delegate = Delegate, + group = Group, + tx_fun = TxFun, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [O | _] -> {atomic, ChildSpecs} = - mnesia:transaction( - fun() -> update_all(O, Pid) end), + [O | _] -> ChildSpecs = + TxFun(fun() -> update_all(O, Pid) end), [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, @@ -387,14 +416,14 @@ code_change(_OldVsn, State, _Extra) -> tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:transaction( - fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> already_in_mnesia; - {atomic, Pid} -> {already_in_mnesia, Pid}; +maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of + start -> start(Delegate, ChildSpec); + undefined -> already_in_mnesia; + Pid -> {already_in_mnesia, Pid} + catch %% If we are torn down while in the transaction... - {aborted, E} -> {error, E} + {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> @@ -429,11 +458,12 @@ delete(Group, Id) -> start(Delegate, ChildSpec) -> apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). -stop(Group, Delegate, Id) -> - case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of - {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); - {atomic, running} -> {error, running}; - {aborted, E} -> {error, E} +stop(Group, TxFun, Delegate, Id) -> + try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of + deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); + running -> {error, running} + catch + {error, E} -> {error, E} end. check_stop(Group, Delegate, Id) -> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 780ef11d..6d7c55dd 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -175,14 +175,14 @@ test_start_idempotence() -> test_unsupported() -> try - ?MS:start_link({global, foo}, get_group(group), ?MODULE, + ?MS:start_link({global, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, one_for_one, []}), exit(no_global) catch error:badarg -> ok end, try - ?MS:start_link({local, foo}, get_group(group), ?MODULE, + ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, simple_one_for_one, []}), exit(no_sofo) catch error:badarg -> @@ -192,7 +192,7 @@ test_unsupported() -> %% Just test we don't blow up test_ignore() -> - ?MS:start_link({local, foo}, get_group(group), ?MODULE, + ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, {sup, fake_strategy_for_ignore, []}), passed. @@ -202,7 +202,7 @@ test_startup_failure() -> test_startup_failure(Fail) -> process_flag(trap_exit, true), - ?MS:start_link(get_group(group), ?MODULE, + ?MS:start_link(get_group(group), fun tx_fun/1, ?MODULE, {sup, one_for_one, [childspec(Fail)]}), receive {'EXIT', _, shutdown} -> @@ -236,10 +236,11 @@ start_sup(Name, Group) -> start_sup({Name, []}, Group). start_sup0(anon, Group, ChildSpecs) -> - ?MS:start_link(Group, ?MODULE, {sup, one_for_one, ChildSpecs}); + ?MS:start_link(Group, fun tx_fun/1, ?MODULE, + {sup, one_for_one, ChildSpecs}); start_sup0(Name, Group, ChildSpecs) -> - ?MS:start_link({local, Name}, Group, ?MODULE, + ?MS:start_link({local, Name}, Group, fun tx_fun/1, ?MODULE, {sup, one_for_one, ChildSpecs}). childspec(Id) -> @@ -258,6 +259,12 @@ pid_of(Id) -> {received, Pid, ping} = call(Id, ping), Pid. +tx_fun(Fun) -> + case mnesia:sync_transaction(Fun) of + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) + end. + inc_group() -> Count = case get(counter) of undefined -> 0; diff --git a/src/rabbit.erl b/src/rabbit.erl index 045c5d58..bd4f1dbc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -20,7 +20,7 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, status/0, is_running/0, - is_running/1, environment/0, rotate_logs/1, force_event_refresh/0, + is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1]). @@ -227,7 +227,7 @@ -spec(is_running/1 :: (node()) -> boolean()). -spec(environment/0 :: () -> [{param(), term()}]). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). @@ -393,7 +393,8 @@ status() -> {running_applications, rabbit_misc:which_applications()}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, - {memory, rabbit_vm:memory()}], + {memory, rabbit_vm:memory()}, + {alarms, alarms()}], S2 = rabbit_misc:filter_exit_map( fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, [{vm_memory_high_watermark, {vm_memory_monitor, @@ -416,6 +417,13 @@ status() -> end}], S1 ++ S2 ++ S3 ++ S4. +alarms() -> + Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]), + fun rabbit_alarm:get_alarms/0), + N = node(), + %% [{{resource_limit,memory,rabbit@mercurio},[]}] + [Limit || {{resource_limit, Limit, Node}, _} <- Alarms, Node =:= N]. + is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). @@ -696,11 +704,11 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. -force_event_refresh() -> - rabbit_direct:force_event_refresh(), - rabbit_networking:force_connection_event_refresh(), - rabbit_channel:force_event_refresh(), - rabbit_amqqueue:force_event_refresh(). +force_event_refresh(Ref) -> + rabbit_direct:force_event_refresh(Ref), + rabbit_networking:force_connection_event_refresh(Ref), + rabbit_channel:force_event_refresh(Ref), + rabbit_amqqueue:force_event_refresh(Ref). %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b71410fe..67bf000d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,7 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, notify_policy_changed/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). @@ -110,7 +110,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), @@ -221,36 +221,37 @@ start(Qs) -> find_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( + mnesia:async_dirty( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, pid = Pid} <- mnesia:table(rabbit_durable_queue), - mnesia:read(rabbit_queue, Name, read) =:= [], - node(Pid) == Node])) + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) end). recover_durable_queues(QueuesAndRecoveryTerms) -> - Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- QueuesAndRecoveryTerms], - [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, - gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. + {Results, Failures} = + gen_server2:mcall([{start_queue_process(node(), Q), + {init, {self(), Terms}}} || + {Q, Terms} <- QueuesAndRecoveryTerms]), + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q0 = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), - Q1 = start_queue_process(Node, Q0), - gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). + Q = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Q#amqqueue{pid = Pid}. + Pid. add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), @@ -354,14 +355,14 @@ with(Name, F, E) -> {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do - %% with the QPid. + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(not_found_or_absent_dirty(Name)); - false -> timer:sleep(25), - with(Name, F, E) - end + fun () -> false = rabbit_misc:is_process_alive(QPid), + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -503,19 +504,20 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). %% the first place since a node failed). Therefore we keep poking at %% the list of queues until we were able to talk to a live process or %% the queue no longer exists. -force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]). +force_event_refresh(Ref) -> + force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). -force_event_refresh(QNames) -> +force_event_refresh(QNames, Ref) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = rabbit_misc:multi_call( - [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + {_, Bad} = gen_server2:mcall( + [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), FailedPids = [Pid || {Pid, _Reason} <- Bad], Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, lists:member(Pid, FailedPids)], case Failed of [] -> ok; _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed) + force_event_refresh(Failed, Ref) end. notify_policy_changed(#amqqueue{pid = QPid}) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c53ff4cf..25f0a18c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,8 +20,8 @@ -behaviour(gen_server2). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(SYNC_INTERVAL, 200). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -328,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -457,14 +460,13 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case Confirm of - true -> confirm_messages([MsgId], State); - false -> State - end, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -487,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State#q{backing_queue_state = BQS1}}; + {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, - discard(Delivery, State)} + discard(Delivery, BQ, BQS, MTC)} end, qname(State), State#q.consumers) of - {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, - State1#q{consumers = Consumers})}; + State#q{backing_queue_state = BQS1, + msg_id_to_channel = MTC1, + consumers = Consumers})}; {undelivered, ActiveConsumersChanged, Consumers} -> {undelivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -522,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {delivered, State3} -> State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State3); + {undelivered, State3 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS2, + msg_id_to_channel = MTC}} -> + {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), + State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = @@ -819,14 +826,15 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> rabbit_event:notify(consumer_created, [{consumer_tag, CTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, {queue, QName}, - {arguments, Args}]). + {arguments, Args}], + Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, @@ -962,7 +970,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args), + not NoAck, qname(State1), Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1043,19 +1051,19 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); -handle_call(force_event_refresh, _From, +handle_call({force_event_refresh, Ref}, _From, State = #q{consumers = Consumers, exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), QName = qname(State), AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args) || + Ch, CTag, false, AckRequired, QName, Args, Ref) || {Ch, CTag, AckRequired, Args} <- AllConsumers]; {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args) + Ch, CTag, true, AckRequired, QName, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl new file mode 100644 index 00000000..1a3db732 --- /dev/null +++ b/src/rabbit_auth_backend_dummy.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_auth_backend_dummy). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_backend). + +-export([description/0]). +-export([user/0]). +-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). + +-ifdef(use_specs). + +-spec(user/0 :: () -> rabbit_types:user()). + +-endif. + +%% A user to be used by the direct client when permission checks are +%% not needed. This user can do anything AMQPish. +user() -> #user{username = <<"dummy">>, + tags = [], + auth_backend = ?MODULE, + impl = none}. + +%% Implementation of rabbit_auth_backend + +description() -> + [{name, <<"Dummy">>}, + {description, <<"Database for the dummy user">>}]. + +check_user_login(_, _) -> + {refused, "cannot log in conventionally as dummy user", []}. + +check_vhost_access(#user{}, _VHostPath) -> true. +check_resource_access(#user{}, #resource{}, _Permission) -> true. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 61919d05..ebeac1f7 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -22,15 +22,18 @@ -export([description/0]). -export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_tags/2, - list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]). --export([make_salt/0, check_password/2, change_password_hash/2, - hash_password/1]). --export([set_permissions/5, clear_permissions/2, - list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, - list_user_vhost_permissions/2, perms_info_keys/0, - vhost_perms_info_keys/0, user_perms_info_keys/0, - user_vhost_perms_info_keys/0]). +-export([add_user/2, delete_user/1, lookup_user/1, + change_password/2, clear_password/1, + hash_password/1, change_password_hash/2, + set_tags/2, set_permissions/5, clear_permissions/2]). +-export([user_info_keys/0, perms_info_keys/0, + user_perms_info_keys/0, vhost_perms_info_keys/0, + user_vhost_perms_info_keys/0, + list_users/0, list_permissions/0, + list_user_permissions/1, list_vhost_permissions/1, + list_user_vhost_permissions/2]). + +%%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -38,45 +41,39 @@ -spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(delete_user/1 :: (rabbit_types:username()) -> 'ok'). +-spec(lookup_user/1 :: (rabbit_types:username()) + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). -spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(clear_password/1 :: (rabbit_types:username()) -> 'ok'). --spec(make_salt/0 :: () -> binary()). --spec(check_password/2 :: (rabbit_types:password(), - rabbit_types:password_hash()) -> boolean()). --spec(change_password_hash/2 :: (rabbit_types:username(), - rabbit_types:password_hash()) -> 'ok'). -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). +-spec(change_password_hash/2 :: (rabbit_types:username(), + rabbit_types:password_hash()) -> 'ok'). -spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). --spec(list_users/0 :: () -> [rabbit_types:infos()]). --spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(lookup_user/1 :: (rabbit_types:username()) - -> rabbit_types:ok(rabbit_types:internal_user()) - | rabbit_types:error('not_found')). -spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'). +-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(list_users/0 :: () -> [rabbit_types:infos()]). -spec(list_permissions/0 :: () -> [rabbit_types:infos()]). --spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_permissions/1 :: (rabbit_types:username()) -> [rabbit_types:infos()]). +-spec(list_vhost_permissions/1 :: + (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_vhost_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]). --spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). + -endif. %%---------------------------------------------------------------------------- - --define(PERMS_INFO_KEYS, [configure, write, read]). --define(USER_INFO_KEYS, [user, tags]). - %% Implementation of rabbit_auth_backend description() -> @@ -85,11 +82,14 @@ description() -> check_user_login(Username, []) -> internal_check_user_login(Username, fun(_) -> true end); -check_user_login(Username, [{password, Password}]) -> +check_user_login(Username, [{password, Cleartext}]) -> internal_check_user_login( - Username, fun(#internal_user{password_hash = Hash}) -> - check_password(Password, Hash) - end); + Username, + fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) -> + Hash =:= salted_md5(Salt, Cleartext); + (#internal_user{}) -> + false + end); check_user_login(Username, AuthProps) -> exit({unknown_auth_props, Username, AuthProps}). @@ -145,42 +145,43 @@ permission_index(read) -> #permission.read. add_user(Username, Password) -> rabbit_log:info("Creating user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write( - rabbit_user, - #internal_user{username = Username, - password_hash = - hash_password(Password), - tags = []}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end), - R. + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + tags = []}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end). delete_user(Username) -> rabbit_log:info("Deleting user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)), - R. + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)). + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). change_password(Username, Password) -> rabbit_log:info("Changing password for '~s'~n", [Username]), @@ -190,70 +191,44 @@ clear_password(Username) -> rabbit_log:info("Clearing password for '~s'~n", [Username]), change_password_hash(Username, <<"">>). -change_password_hash(Username, PasswordHash) -> - R = update_user(Username, fun(User) -> - User#internal_user{ - password_hash = PasswordHash } - end), - R. - hash_password(Cleartext) -> - Salt = make_salt(), - Hash = salted_md5(Salt, Cleartext), - <<Salt/binary, Hash/binary>>. - -check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> - Hash =:= salted_md5(Salt, Cleartext); -check_password(_Cleartext, _Any) -> - false. - -make_salt() -> {A1,A2,A3} = now(), random:seed(A1, A2, A3), Salt = random:uniform(16#ffffffff), - <<Salt:32>>. + SaltBin = <<Salt:32>>, + Hash = salted_md5(SaltBin, Cleartext), + <<SaltBin/binary, Hash/binary>>. + +change_password_hash(Username, PasswordHash) -> + update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash } + end). salted_md5(Salt, Cleartext) -> Salted = <<Salt/binary, Cleartext/binary>>, erlang:md5(Salted). set_tags(Username, Tags) -> - rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]), - R = update_user(Username, fun(User) -> - User#internal_user{tags = Tags} - end), - R. - -update_user(Username, Fun) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - {ok, User} = lookup_user(Username), - ok = mnesia:write(rabbit_user, Fun(User), write) - end)). - -list_users() -> - [[{user, Username}, {tags, Tags}] || - #internal_user{username = Username, tags = Tags} <- - mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. - -user_info_keys() -> ?USER_INFO_KEYS. - -lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). - -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case re:compile(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) - end. + rabbit_log:info("Setting user tags for user '~s' to ~p~n", + [Username, Tags]), + update_user(Username, fun(User) -> + User#internal_user{tags = Tags} + end). set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n", + rabbit_log:info("Setting permissions for " + "'~s' in '~s' to '~s', '~s', '~s'~n", [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]), - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + lists:map( + fun (RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, + Regexp, Reason}}) + end + end, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -269,7 +244,6 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> write) end)). - clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( @@ -280,32 +254,36 @@ clear_permissions(Username, VHostPath) -> virtual_host = VHostPath}}) end)). +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +%%---------------------------------------------------------------------------- +%% Listing + +-define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). + +user_info_keys() -> ?USER_INFO_KEYS. + perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS]. vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS]. user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS]. user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS. +list_users() -> + [[{user, Username}, {tags, Tags}] || + #internal_user{username = Username, tags = Tags} <- + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + list_permissions() -> list_permissions(perms_info_keys(), match_user_vhost('_', '_')). -list_vhost_permissions(VHostPath) -> - list_permissions( - vhost_perms_info_keys(), - rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). - -list_user_permissions(Username) -> - list_permissions( - user_perms_info_keys(), - rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). - -list_user_vhost_permissions(Username, VHostPath) -> - list_permissions( - user_vhost_perms_info_keys(), - rabbit_misc:with_user_and_vhost( - Username, VHostPath, match_user_vhost(Username, VHostPath))). - -filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. - list_permissions(Keys, QueryThunk) -> [filter_props(Keys, [{user, Username}, {vhost, VHostPath}, @@ -320,6 +298,24 @@ list_permissions(Keys, QueryThunk) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. +filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. + +list_user_permissions(Username) -> + list_permissions( + user_perms_info_keys(), + rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). + +list_vhost_permissions(VHostPath) -> + list_permissions( + vhost_perms_info_keys(), + rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). + +list_user_vhost_permissions(Username, VHostPath) -> + list_permissions( + user_vhost_perms_info_keys(), + rabbit_misc:with_user_and_vhost( + Username, VHostPath, match_user_vhost(Username, VHostPath))). + match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 11e6bd38..1b4a07e3 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -169,7 +169,7 @@ add(Binding, InnerFun) -> ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/0 + [_] -> fun () -> ok end end; {error, _} = Err -> rabbit_misc:const(Err) @@ -200,13 +200,15 @@ remove(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - case mnesia:read(rabbit_route, B, write) =:= [] andalso - mnesia:read(rabbit_durable_route, B, write) =/= [] of - true -> rabbit_misc:const({error, binding_not_found}); - false -> case InnerFun(Src, Dst) of - ok -> remove(Src, Dst, B); - {error, _} = Err -> rabbit_misc:const(Err) - end + case mnesia:read(rabbit_route, B, write) of + [] -> case mnesia:read(rabbit_durable_route, B, write) of + [] -> rabbit_misc:const(ok); + _ -> rabbit_misc:const({error, binding_not_found}) + end; + _ -> case InnerFun(Src, Dst) of + ok -> remove(Src, Dst, B); + {error, _} = Err -> rabbit_misc:const(Err) + end end end, fun absent_errs_only/1). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d866908..7907c96c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -24,7 +24,7 @@ -export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). --export([force_event_refresh/0]). +-export([force_event_refresh/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -106,7 +106,7 @@ -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_local/0 :: () -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -endif. @@ -179,8 +179,8 @@ refresh_config_local() -> ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). -force_event_refresh() -> - [gen_server2:cast(C, force_event_refresh) || C <- list()], +force_event_refresh(Ref) -> + [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. %%--------------------------------------------------------------------------- @@ -335,8 +335,9 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); -handle_cast(force_event_refresh, State) -> - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), +handle_cast({force_event_refresh, Ref}, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), + Ref), noreply(State); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> @@ -496,15 +497,14 @@ check_user_id_header(#'P_basic'{user_id = undefined}, _) -> check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; +check_user_id_header( + #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> + ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> - case lists:member(impersonator, Tags) of - true -> ok; - false -> precondition_failed( - "user_id property set to '~s' but authenticated user was " - "'~s'", [Claimed, Actual]) - end. + #ch{user = #user{username = Actual}}) -> + precondition_failed( + "user_id property set to '~s' but authenticated user was '~s'", + [Claimed, Actual]). check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of @@ -1439,8 +1439,9 @@ notify_limiter(Limiter, Acked) -> end end. -deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - mandatory = false}, +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + confirm = false, + mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f3463286..746f2bdb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -90,6 +90,7 @@ status, environment, report, + set_cluster_name, eval, close_connection, @@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], ok; +action(set_cluster_name, Node, [Name], _Opts, Inform) -> + Inform("Setting cluster name to ~s", [Name]), + rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]); + action(eval, Node, [Expr], _Opts, _Inform) -> case erl_scan:string(Expr) of {ok, Scanned, _} -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 5a004792..1a5f400b 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, force_event_refresh/0, list/0, connect/5, +-export([boot/0, force_event_refresh/1, list/0, connect/5, start_channel/9, disconnect/2]). %% Internal -export([list_local/0]). @@ -28,10 +28,10 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(force_event_refresh/0 :: () -> 'ok'). +-spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() | +-spec(connect/5 :: (({'none', 'none'} | {rabbit_types:username(), 'none'} | {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> @@ -54,8 +54,8 @@ boot() -> rabbit_sup:start_supervisor_child( [{local, rabbit_direct_client_sup}, {rabbit_channel_sup, start_link, []}]). -force_event_refresh() -> - [Pid ! force_event_refresh || Pid<- list()], +force_event_refresh(Ref) -> + [Pid ! {force_event_refresh, Ref} || Pid <- list()], ok. list_local() -> @@ -67,37 +67,39 @@ list() -> %%---------------------------------------------------------------------------- -connect(User = #user{}, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_event:notify(connection_created, Infos), - {ok, {User, rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = access_refused} -> - {error, access_refused} - end; +connect({none, _}, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, + VHost, Protocol, Pid, Infos); + +connect({Username, none}, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end, + VHost, Protocol, Pid, Infos); connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(fun () -> rabbit_access_control:check_user_pass_login( Username, Password) end, - VHost, Protocol, Pid, Infos); - -connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login( - Username, []) end, VHost, Protocol, Pid, Infos). connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> case AuthFun() of {ok, User} -> - connect(User, VHost, Protocol, Pid, Infos); + connect1(User, VHost, Protocol, Pid, Infos); {refused, _M, _A} -> {error, {auth_failure, "Refused"}} end; false -> {error, broker_not_found_on_node} end. +connect1(User, VHost, Protocol, Pid, Infos) -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end. start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index a713d76b..e0226955 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -22,7 +22,7 @@ -export([init_stats_timer/2, init_disabled_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). --export([notify/2, notify_if/3]). +-export([notify/2, notify/3, notify_if/3]). %%---------------------------------------------------------------------------- @@ -41,6 +41,7 @@ -type(event() :: #event { type :: event_type(), props :: event_props(), + reference :: 'none' | reference(), timestamp :: event_timestamp() }). -type(level() :: 'none' | 'coarse' | 'fine'). @@ -58,6 +59,7 @@ -spec(stats_level/2 :: (container(), pos()) -> level()). -spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -140,7 +142,10 @@ if_enabled(C, P, Fun) -> notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. -notify(Type, Props) -> +notify(Type, Props) -> notify(Type, Props, none). + +notify(Type, Props, Ref) -> gen_event:notify(?MODULE, #event{type = Type, props = Props, + reference = Ref, timestamp = os:timestamp()}). diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 1a766b05..4658ecfd 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -94,9 +94,12 @@ ensure_dir_internal(File) -> end. wildcard(Pattern, Dir) -> - {ok, Files} = list_dir(Dir), - {ok, RE} = re:compile(Pattern, [anchored]), - [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])]. + case list_dir(Dir) of + {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]), + [File || File <- Files, + match =:= re:run(File, RE, [{capture, none}])]; + {error, _} -> [] + end. list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ca495733..4f77009c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) -> ok. drop_mirror(QName, MirrorNode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid] when SPids =:= [] -> - {error, cannot_drop_only_mirror}; - [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), - exit(Pid, {shutdown, dropped}), - {ok, dropped} - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + {ok, dropped} + end; + {error, not_found} = E -> + E + end. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q, SyncMode); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q, SyncMode) + end + end; + {error, not_found} = E -> + E + end. start_child(Name, MirrorNode, Q, SyncMode) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(down), - fun () -> - rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) - end) of - {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode); - _ -> ok - end. + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 1c63980e..848c4a87 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -53,13 +53,12 @@ -export([parse_arguments/3]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). --export([const_ok/0, const/1]). +-export([const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2, pset/3]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). --export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). -export([version/0, which_applications/0]). @@ -219,7 +218,6 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). --spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -230,8 +228,6 @@ -spec(pset/3 :: (term(), term(), [term()]) -> term()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). --spec(multi_call/2 :: - ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(version/0 :: () -> string()). @@ -891,7 +887,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> {error, Reason} end. -const_ok() -> ok. const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see @@ -950,31 +945,6 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). -%% A simplified version of gen_server:multi_call/2 with a sane -%% API. This is not in gen_server2 as there is no useful -%% infrastructure there to share. -multi_call(Pids, Req) -> - MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], - receive_multi_call(MonitorPids, [], []). - -start_multi_call(Pid, Req) when is_pid(Pid) -> - Mref = erlang:monitor(process, Pid), - Pid ! {'$gen_call', {self(), Mref}, Req}, - {Mref, Pid}. - -receive_multi_call([], Good, Bad) -> - {lists:reverse(Good), lists:reverse(Bad)}; -receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> - receive - {Mref, Reply} -> - erlang:demonitor(Mref, [flush]), - receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); - {'DOWN', Mref, _, _, noconnection} -> - receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); - {'DOWN', Mref, _, _, Reason} -> - receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) - end. - os_cmd(Command) -> case os:type() of {win32, _} -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f27f77c6..59873ffc 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -327,6 +327,7 @@ status() -> case is_running() of true -> RunningNodes = cluster_nodes(running), [{running_nodes, RunningNodes}, + {cluster_name, rabbit_nodes:cluster_name()}, {partitions, mnesia_partitions(RunningNodes)}]; false -> [] end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 91be4dcb..42438790 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -22,7 +22,7 @@ connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2, force_connection_event_refresh/0, tcp_host/1]). + close_connection/2, force_connection_event_refresh/1, tcp_host/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/6, @@ -80,7 +80,7 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). --spec(force_connection_event_refresh/0 :: () -> 'ok'). +-spec(force_connection_event_refresh/1 :: (reference()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). @@ -331,8 +331,8 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. -force_connection_event_refresh() -> - [rabbit_reader:force_event_refresh(C) || C <- connections()], +force_connection_event_refresh(Ref) -> + [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. %%-------------------------------------------------------------------- diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5a1613a7..c5aa8473 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,8 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2, fqdn_nodename/0]). + is_running/2, is_process_running/2, + cluster_name/0, set_cluster_name/1]). -include_lib("kernel/include/inet.hrl"). @@ -37,7 +38,8 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). --spec(fqdn_nodename/0 :: () -> binary()). +-spec(cluster_name/0 :: () -> binary()). +-spec(set_cluster_name/1 :: (binary()) -> 'ok'). -endif. @@ -111,8 +113,15 @@ is_process_running(Node, Process) -> P when is_pid(P) -> true end. -fqdn_nodename() -> +cluster_name() -> + rabbit_runtime_parameters:value_global( + cluster_name, cluster_name_default()). + +cluster_name_default() -> {ID, _} = rabbit_nodes:parts(node()), {ok, Host} = inet:gethostname(), {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + +set_cluster_name(Name) -> + rabbit_runtime_parameters:set_global(cluster_name, Name). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 919b7376..e00508b4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count} = + {Segments1, Count, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. + %% find the number of unacked messages. Also accumulate the + %% dirty count here, so we can call maybe_flush_journal below + %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { unacked = UnackedCount } = + fun (Seg, {Segments2, CountAcc, DirtyCount}) -> + {Segment = #segment { unacked = UnackedCount }, Dirty} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), - {segment_store(Segment, Segments2), CountAcc + UnackedCount} - end, {Segments, 0}, all_segment_nums(State1)), - %% Unconditionally flush since the dirty_count doesn't get updated - %% by the above foldl. - State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {segment_store(Segment, Segments2), + CountAcc + UnackedCount, DirtyCount + Dirty} + end, {Segments, 0, 0}, all_segment_nums(State1)), + State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, + dirty_count = DirtyCount }), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -463,23 +465,25 @@ recover_segment(ContainsCheckFun, CleanShutdown, segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - Segment1) -> + SegmentAndDirtyCount) -> recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, Segment1) + Del, RelSeq, SegmentAndDirtyCount) end, - Segment #segment { unacked = UnackedCount + UnackedCountDelta }, + {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, SegEntries1). -recover_message( true, true, _Del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, del, Segment); -recover_message(false, _, del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, Segment); -recover_message(false, _, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). +recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, del, Segment), DirtyCount + 1}; +recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; +recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, + add_to_journal(RelSeq, del, Segment)), + DirtyCount + 2}. queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), @@ -651,9 +655,18 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1). + case is_journal_present(State) of + true -> {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + load_journal_entries(State1); + false -> State + end. + +is_journal_present(#qistate { journal_handle = undefined, + dir = Dir }) -> + rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)); +is_journal_present(_) -> + true. %% ditto recover_journal(State) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f7c4c0a2..9efb5c96 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,7 +18,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -77,7 +77,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(force_event_refresh/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -134,8 +134,8 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -force_event_refresh(Pid) -> - gen_server:cast(Pid, force_event_refresh). +force_event_refresh(Pid, Ref) -> + gen_server:cast(Pid, {force_event_refresh, Ref}). conserve_resources(Pid, Source, Conserve) -> Pid ! {conserve_resources, Source, Conserve}, @@ -156,19 +156,23 @@ server_properties(Protocol) -> [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, - list_to_binary(Value)}; + maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]]], + [{product, Product}, + {version, Version}, + {cluster_name, rabbit_nodes:cluster_name()}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +maybe_list_to_binary(V) when is_binary(V) -> V; +maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). + server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, @@ -396,10 +400,11 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> catch Error -> {error, Error} end), State; -handle_other({'$gen_cast', force_event_refresh}, State) +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> - rabbit_event:notify(connection_created, - [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), + rabbit_event:notify( + connection_created, + [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), State; handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. @@ -1069,12 +1074,16 @@ i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; i(state, #v1{connection_state = ConnectionState, - throttle = #throttle{last_blocked_by = BlockedBy, - last_blocked_at = T}}) -> - Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000, - case {BlockedBy, Recent} of - {flow, true} -> flow; - {_, _} -> ConnectionState + throttle = #throttle{alarmed_by = Alarms, + last_blocked_by = WasBlockedBy, + last_blocked_at = T}}) -> + case Alarms =:= [] andalso %% not throttled by resource alarms + (credit_flow:blocked() %% throttled by flow now + orelse %% throttled by flow recently + (WasBlockedBy =:= flow andalso T =/= never andalso + timer:now_diff(erlang:now(), T) < 5000000)) of + true -> flow; + false -> ConnectionState end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index bcde0078..18b9fbb8 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -22,6 +22,8 @@ list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). +-export([set_global/2, value_global/1, value_global/2]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -34,6 +36,7 @@ -> ok_or_error_string()). -spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) @@ -48,6 +51,8 @@ -> rabbit_types:infos() | 'not_found'). -spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). -spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). +-spec(value_global/1 :: (atom()) -> term() | 'not_found'). +-spec(value_global/2 :: (atom(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -endif. @@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) -> set(VHost, Component, Name, Term) -> set_any(VHost, Component, Name, Term). +set_global(Name, Term) -> + mnesia_update(Name, Term), + ok. + format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. @@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) -> E end. +mnesia_update(Key, Term) -> + rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)). + mnesia_update(VHost, Comp, Name, Term) -> - F = fun () -> - Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of - [] -> new; - [Params] -> {old, Params#runtime_parameters.value} + rabbit_misc:execute_mnesia_transaction( + rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))). + +mnesia_update_fun(Key, Term) -> + fun () -> + Res = case mnesia:read(?TABLE, Key, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write), - Res - end, - rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)). + ok = mnesia:write(?TABLE, c(Key, Term), write), + Res + end. clear(_, <<"policy">> , _) -> {error_string, "policies may not be cleared using this method"}; @@ -159,43 +174,46 @@ list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. lookup(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of + case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of +value(VHost, Comp, Name) -> value0({VHost, Comp, Name}). +value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def). + +value_global(Key) -> value0(Key). +value_global(Key, Default) -> value0(Key, Default). + +value0(Key) -> + case lookup0(Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Name, Default) -> - Params = lookup0(VHost, Component, Name, - fun () -> - lookup_missing(VHost, Component, Name, Default) - end), +value0(Key, Default) -> + Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Name, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of +lookup0(Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, Key) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Name, Default) -> +lookup_missing(Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Name}, read) of - [] -> Record = c(VHost, Component, Name, Default), + case mnesia:read(?TABLE, Key, read) of + [] -> Record = c(Key, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Name, Default) -> - #runtime_parameters{key = {VHost, Component, Name}, +c(Key, Default) -> + #runtime_parameters{key = Key, value = Default}. p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 74ff2adb..ce7fe451 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -39,7 +39,6 @@ all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), passed = test_version_equivalance(), - passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_rabbit_basic_header_handling(), @@ -66,6 +65,7 @@ all_tests() -> passed = test_amqp_connection_refusal(), passed = test_confirms(), passed = test_with_state(), + passed = test_mcall(), passed = do_if_secondary_node( fun run_cluster_dependent_tests/1, @@ -156,26 +156,6 @@ test_version_equivalance() -> false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), passed. -test_multi_call() -> - Fun = fun() -> - receive - {'$gen_call', {From, Mref}, request} -> - From ! {Mref, response} - end, - receive - never -> ok - end - end, - Pid1 = spawn(Fun), - Pid2 = spawn(Fun), - Pid3 = spawn(Fun), - exit(Pid2, bang), - {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} = - rabbit_misc:multi_call([Pid1, Pid2, Pid3], request), - exit(Pid1, bang), - exit(Pid3, bang), - passed. - test_rabbit_basic_header_handling() -> passed = write_table_with_invalid_existing_type_test(), passed = invalid_existing_headers_test(), @@ -1049,6 +1029,9 @@ test_user_management() -> ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), + {new, _} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/testhost">>, queue, <<"test">>), + true, false, [], none), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -1375,6 +1358,82 @@ test_with_state() -> fun (S) -> element(1, S) end), passed. +test_mcall() -> + P1 = spawn(fun gs2_test_listener/0), + register(foo, P1), + global:register_name(gfoo, P1), + + P2 = spawn(fun() -> exit(bang) end), + %% ensure P2 is dead (ignore the race setting up the monitor) + await_exit(P2), + + P3 = spawn(fun gs2_test_crasher/0), + + %% since P2 crashes almost immediately and P3 after receiving its first + %% message, we have to spawn a few more processes to handle the additional + %% cases we're interested in here + register(baz, spawn(fun gs2_test_crasher/0)), + register(bog, spawn(fun gs2_test_crasher/0)), + global:register_name(gbaz, spawn(fun gs2_test_crasher/0)), + + NoNode = rabbit_nodes:make("nonode"), + + Targets = + %% pids + [P1, P2, P3] + ++ + %% registered names + [foo, bar, baz] + ++ + %% {Name, Node} pairs + [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}] + ++ + %% {global, Name} + [{global, gfoo}, {global, gbar}, {global, gbaz}], + + GoodResults = [{D, goodbye} || D <- [P1, foo, + {foo, node()}, + {global, gfoo}]], + + BadResults = [{P2, noproc}, % died before use + {P3, boom}, % died on first use + {bar, noproc}, % never registered + {baz, boom}, % died on first use + {{bar, node()}, noproc}, % never registered + {{bog, node()}, boom}, % died on first use + {{foo, NoNode}, nodedown}, % invalid node + {{global, gbar}, noproc}, % never registered globally + {{global, gbaz}, boom}], % died on first use + + {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]), + true = lists:sort(Replies) == lists:sort(GoodResults), + true = lists:sort(Errors) == lists:sort(BadResults), + + %% cleanup (ignore the race setting up the monitor) + P1 ! stop, + await_exit(P1), + passed. + +await_exit(Pid) -> + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, _, _, _} -> ok + end. + +gs2_test_crasher() -> + receive + {'$gen_call', _From, hello} -> exit(boom) + end. + +gs2_test_listener() -> + receive + {'$gen_call', From, hello} -> + gen_server2:reply(From, goodbye), + gs2_test_listener(); + stop -> + ok + end. + test_statistics_event_receiver(Pid) -> receive Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) @@ -1474,7 +1533,7 @@ test_refresh_events(SecondaryNode) -> expect_events(Tag, Key, Type) -> expect_event(Tag, Key, Type), - rabbit:force_event_refresh(), + rabbit:force_event_refresh(make_ref()), expect_event(Tag, Key, Type). expect_event(Tag, Key, Type) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 90372461..4cb3cacc 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -47,6 +47,7 @@ -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). +-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). %% ------------------------------------------------------------------- @@ -355,6 +356,32 @@ internal_system_x() -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +cluster_name() -> + {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), + ok. + +cluster_name_tx() -> + %% mnesia:transform_table/4 does not let us delete records + T = rabbit_runtime_parameters, + mnesia:write_lock_table(T), + Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K + <- mnesia:all_keys(T)], + case Ks of + [] -> ok; + [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write), + R = {runtime_parameters, cluster_name, Name}, + mnesia:write(T, R, write), + case Tl of + [] -> ok; + _ -> {VHost, _, _} = K, + error_logger:warning_msg( + "Multiple local-nodenames found, picking '~s' " + "from '~s' for cluster name~n", [Name, VHost]) + end + end, + [mnesia:delete(T, K, write) || K <- Ks], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 020b5b33..321af4ac 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -643,6 +643,31 @@ drop(AckRequired, State) -> ack([], State) -> {[], State}; +%% optimisation: this head is essentially a partial evaluation of the +%% general case below, for the single-ack case. +ack([SeqId], State) -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + remove_pending_ack(SeqId, State), + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgOnDisk of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + PCount1 = PCount - one_if(IsPersistent), + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -730,20 +755,20 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, ack_out_counter = AckOutCount, - rates = #rates{ in = InRate, - out = OutRate, - ack_in = AckInRate, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, ack_out = AckOutRate, timestamp = TS }}) -> Now = erlang:now(), - Rates = #rates { in = update_rate(Now, TS, InCount, InRate), - out = update_rate(Now, TS, OutCount, OutRate), - ack_in = update_rate(Now, TS, AckInCount, AckInRate), + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), timestamp = Now }, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 047bce77..9fa4da44 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -83,9 +83,9 @@ delete(VHostPath) -> %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), - [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + [assert_benign(rabbit_amqqueue:delete(Q, false, false)) || Q <- rabbit_amqqueue:list(VHostPath)], - [ok = rabbit_exchange:delete(Name, false) || + [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> @@ -94,6 +94,18 @@ delete(VHostPath) -> ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q}}) -> + %% We have a durable queue on a down node. Removing the mnesia + %% entries here is safe. If/when the down node restarts, it will + %% clear out the on-disk storage of the queue. + case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of + ok -> ok; + {error, not_found} -> ok + end. + internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) |