summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml17
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/gen_server2.erl88
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl61
-rw-r--r--src/rabbit_auth_backend_dummy.erl49
-rw-r--r--src/rabbit_auth_backend_internal.erl268
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_channel_interceptor.erl11
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_direct.erl28
-rw-r--r--src/rabbit_exchange_type_topic.erl25
-rw-r--r--src/rabbit_mirror_queue_misc.erl79
-rw-r--r--src/rabbit_misc.erl32
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_nodes.erl15
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_runtime_parameters.erl64
-rw-r--r--src/rabbit_tests.erl170
-rw-r--r--src/rabbit_upgrade_functions.erl27
-rw-r--r--src/rabbit_variable_queue.erl43
-rw-r--r--src/rabbit_vhost.erl16
23 files changed, 682 insertions, 378 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d19acd00..a7e42503 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -502,6 +502,23 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets the cluster name. The cluster name is announced to
+ clients on connection, and used by the federation and
+ shovel plugins to record where a message has been. The
+ cluster name is by default derived from the hostname of
+ the first node in the cluster, but can be changed.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_cluster_name london</screen>
+ <para role="example">
+ This sets the cluster name to "london".
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 00f7341f..19eef65a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -60,7 +60,7 @@
-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
--record(trie_binding, {exchange_name, node_id, destination}).
+-record(trie_binding, {exchange_name, node_id, destination, arguments}).
-record(listener, {node, protocol, host, ip_address, port}).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6690d181..ee82bcb3 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -81,6 +81,14 @@
%% process as sys:get_status/1 would). Pass through a function which
%% can be invoked on the state, get back the result. The state is not
%% modified.
+%%
+%% 10) an mcall/1 function has been added for performing multiple
+%% call/3 in parallel. Unlike multi_call, which sends the same request
+%% to same-named processes residing on a supplied list of nodes, it
+%% operates on name/request pairs, where name is anything accepted by
+%% call/3, i.e. a pid, global name, local name, or local name on a
+%% particular node.
+%%
%% All modifications are (C) 2009-2013 GoPivotal, Inc.
@@ -190,6 +198,7 @@
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
+ mcall/1,
with_state/2,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
@@ -389,6 +398,85 @@ multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
+%%% -----------------------------------------------------------------
+%%% Make multiple calls to multiple servers, given pairs of servers
+%%% and messages.
+%%% Returns: {[{Dest, Reply}], [{Dest, Error}]}
+%%%
+%%% Dest can be pid() | RegName :: atom() |
+%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()}
+%%%
+%%% A middleman process is used to avoid clogging up the callers
+%%% message queue.
+%%% -----------------------------------------------------------------
+mcall(CallSpecs) ->
+ Tag = make_ref(),
+ {_, MRef} = spawn_monitor(
+ fun() ->
+ Refs = lists:foldl(
+ fun ({Dest, _Request}=S, Dict) ->
+ dict:store(do_mcall(S), Dest, Dict)
+ end, dict:new(), CallSpecs),
+ collect_replies(Tag, Refs, [], [])
+ end),
+ receive
+ {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
+ {'DOWN', MRef, _, _, Reason} -> exit(Reason)
+ end.
+
+do_mcall({{global,Name}=Dest, Request}) ->
+ %% whereis_name is simply an ets lookup, and is precisely what
+ %% global:send/2 does, yet we need a Ref to put in the call to the
+ %% server, so invoking whereis_name makes a lot more sense here.
+ case global:whereis_name(Name) of
+ Pid when is_pid(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ catch msend(Pid, MRef, Request),
+ MRef;
+ undefined ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, Dest, noproc},
+ Ref
+ end;
+do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
+ {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
+ catch msend(Dest, MRef, Request),
+ MRef;
+do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
+ MRef = erlang:monitor(process, Dest),
+ catch msend(Dest, MRef, Request),
+ MRef.
+
+msend(Dest, MRef, Request) ->
+ erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).
+
+collect_replies(Tag, Refs, Replies, Errors) ->
+ case dict:size(Refs) of
+ 0 -> exit({Tag, {Replies, Errors}});
+ _ -> receive
+ {MRef, Reply} ->
+ {Refs1, Replies1} = handle_call_result(MRef, Reply,
+ Refs, Replies),
+ collect_replies(Tag, Refs1, Replies1, Errors);
+ {'DOWN', MRef, _, _, Reason} ->
+ Reason1 = case Reason of
+ noconnection -> nodedown;
+ _ -> Reason
+ end,
+ {Refs1, Errors1} = handle_call_result(MRef, Reason1,
+ Refs, Errors),
+ collect_replies(Tag, Refs1, Replies, Errors1)
+ end
+ end.
+
+handle_call_result(MRef, Result, Refs, AccList) ->
+ %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2}
+ %% here, so we must cope with MRefs that we've already seen and erased
+ case dict:find(MRef, Refs) of
+ {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]};
+ _ -> {Refs, AccList}
+ end.
+
%% -----------------------------------------------------------------
%% Apply a function to a generic server's state.
%% -----------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a7438a3f..c0478579 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -354,14 +354,14 @@ with(Name, F, E) ->
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
- %% with the QPid.
+ %% with the QPid. F() should be written s.t. that this
+ %% cannot happen, so we bail if it does since that
+ %% indicates a code bug and we don't want to get stuck in
+ %% the retry loop.
rabbit_misc:with_exit_handler(
- fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E(not_found_or_absent_dirty(Name));
- false -> timer:sleep(25),
- with(Name, F, E)
- end
+ fun () -> false = rabbit_misc:is_process_alive(QPid),
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -507,8 +507,8 @@ force_event_refresh(Ref) ->
force_event_refresh(QNames, Ref) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- {_, Bad} = rabbit_misc:multi_call(
- [Q#amqqueue.pid || Q <- Qs], {force_event_refresh, Ref}),
+ {_, Bad} = gen_server2:mcall(
+ [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
lists:member(Pid, FailedPids)],
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d453c6d3..a1997376 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,8 +20,8 @@
-behaviour(gen_server2).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(SYNC_INTERVAL, 200). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
@@ -328,10 +328,13 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
-next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+next_state(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
+ MTC1 = confirm_messages(MsgIds, MTC),
+ State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) ->
end,
State.
-confirm_messages([], State) ->
- State;
-confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+confirm_messages([], MTC) ->
+ MTC;
+confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
end
end, {gb_trees:empty(), MTC}, MsgIds),
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
- State#q{msg_id_to_channel = MTC1}.
+ MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
@@ -448,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-send_mandatory(#delivery{mandatory = false}) ->
+send_mandatory(#delivery{mandatory = false}) ->
ok;
send_mandatory(#delivery{mandatory = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}) ->
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-discard(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo,
- message = #basic_message{id = MsgId}}, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case MsgSeqNo of
- undefined -> State;
- _ -> confirm_messages([MsgId], State)
- end,
+discard(#delivery{confirm = Confirm,
+ sender = SenderPid,
+ message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
+ MTC1 = case Confirm of
+ true -> confirm_messages([MsgId], MTC);
+ false -> MTC
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
- State1#q{backing_queue_state = BQS1}.
+ {BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -487,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) ->
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
{AckTag, BQS1} = BQ:publish_delivered(
Message, Props, SenderPid, BQS),
- {{Message, Delivered, AckTag},
- State#q{backing_queue_state = BQS1}};
+ {{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
- discard(Delivery, State)}
+ discard(Delivery, BQ, BQS, MTC)}
end, qname(State), State#q.consumers) of
- {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
- State1#q{consumers = Consumers})};
+ State#q{backing_queue_state = BQS1,
+ msg_id_to_channel = MTC1,
+ consumers = Consumers})};
{undelivered, ActiveConsumersChanged, Consumers} ->
{undelivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -512,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, State),
+ Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
@@ -522,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{delivered, State3} ->
State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State3);
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS2,
+ msg_id_to_channel = MTC}} ->
+ {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
+ State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl
new file mode 100644
index 00000000..1a3db732
--- /dev/null
+++ b/src/rabbit_auth_backend_dummy.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_auth_backend_dummy).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_backend).
+
+-export([description/0]).
+-export([user/0]).
+-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
+
+-ifdef(use_specs).
+
+-spec(user/0 :: () -> rabbit_types:user()).
+
+-endif.
+
+%% A user to be used by the direct client when permission checks are
+%% not needed. This user can do anything AMQPish.
+user() -> #user{username = <<"dummy">>,
+ tags = [],
+ auth_backend = ?MODULE,
+ impl = none}.
+
+%% Implementation of rabbit_auth_backend
+
+description() ->
+ [{name, <<"Dummy">>},
+ {description, <<"Database for the dummy user">>}].
+
+check_user_login(_, _) ->
+ {refused, "cannot log in conventionally as dummy user", []}.
+
+check_vhost_access(#user{}, _VHostPath) -> true.
+check_resource_access(#user{}, #resource{}, _Permission) -> true.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 61919d05..ebeac1f7 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -22,15 +22,18 @@
-export([description/0]).
-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, set_tags/2,
- list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]).
--export([make_salt/0, check_password/2, change_password_hash/2,
- hash_password/1]).
--export([set_permissions/5, clear_permissions/2,
- list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
- list_user_vhost_permissions/2, perms_info_keys/0,
- vhost_perms_info_keys/0, user_perms_info_keys/0,
- user_vhost_perms_info_keys/0]).
+-export([add_user/2, delete_user/1, lookup_user/1,
+ change_password/2, clear_password/1,
+ hash_password/1, change_password_hash/2,
+ set_tags/2, set_permissions/5, clear_permissions/2]).
+-export([user_info_keys/0, perms_info_keys/0,
+ user_perms_info_keys/0, vhost_perms_info_keys/0,
+ user_vhost_perms_info_keys/0,
+ list_users/0, list_permissions/0,
+ list_user_permissions/1, list_vhost_permissions/1,
+ list_user_vhost_permissions/2]).
+
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -38,45 +41,39 @@
-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok').
-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok').
+-spec(lookup_user/1 :: (rabbit_types:username())
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password())
-> 'ok').
-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok').
--spec(make_salt/0 :: () -> binary()).
--spec(check_password/2 :: (rabbit_types:password(),
- rabbit_types:password_hash()) -> boolean()).
--spec(change_password_hash/2 :: (rabbit_types:username(),
- rabbit_types:password_hash()) -> 'ok').
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
+-spec(change_password_hash/2 :: (rabbit_types:username(),
+ rabbit_types:password_hash()) -> 'ok').
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> [rabbit_types:infos()]).
--spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(lookup_user/1 :: (rabbit_types:username())
- -> rabbit_types:ok(rabbit_types:internal_user())
- | rabbit_types:error('not_found')).
-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
+-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
--spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
(rabbit_types:username()) -> [rabbit_types:infos()]).
+-spec(list_vhost_permissions/1 ::
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
-> [rabbit_types:infos()]).
--spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+
-endif.
%%----------------------------------------------------------------------------
-
--define(PERMS_INFO_KEYS, [configure, write, read]).
--define(USER_INFO_KEYS, [user, tags]).
-
%% Implementation of rabbit_auth_backend
description() ->
@@ -85,11 +82,14 @@ description() ->
check_user_login(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
-check_user_login(Username, [{password, Password}]) ->
+check_user_login(Username, [{password, Cleartext}]) ->
internal_check_user_login(
- Username, fun(#internal_user{password_hash = Hash}) ->
- check_password(Password, Hash)
- end);
+ Username,
+ fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) ->
+ Hash =:= salted_md5(Salt, Cleartext);
+ (#internal_user{}) ->
+ false
+ end);
check_user_login(Username, AuthProps) ->
exit({unknown_auth_props, Username, AuthProps}).
@@ -145,42 +145,43 @@ permission_index(read) -> #permission.read.
add_user(Username, Password) ->
rabbit_log:info("Creating user '~s'~n", [Username]),
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(
- rabbit_user,
- #internal_user{username = Username,
- password_hash =
- hash_password(Password),
- tags = []},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end),
- R.
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ tags = []},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end).
delete_user(Username) ->
rabbit_log:info("Deleting user '~s'~n", [Username]),
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)),
- R.
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)).
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
change_password(Username, Password) ->
rabbit_log:info("Changing password for '~s'~n", [Username]),
@@ -190,70 +191,44 @@ clear_password(Username) ->
rabbit_log:info("Clearing password for '~s'~n", [Username]),
change_password_hash(Username, <<"">>).
-change_password_hash(Username, PasswordHash) ->
- R = update_user(Username, fun(User) ->
- User#internal_user{
- password_hash = PasswordHash }
- end),
- R.
-
hash_password(Cleartext) ->
- Salt = make_salt(),
- Hash = salted_md5(Salt, Cleartext),
- <<Salt/binary, Hash/binary>>.
-
-check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext);
-check_password(_Cleartext, _Any) ->
- false.
-
-make_salt() ->
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
Salt = random:uniform(16#ffffffff),
- <<Salt:32>>.
+ SaltBin = <<Salt:32>>,
+ Hash = salted_md5(SaltBin, Cleartext),
+ <<SaltBin/binary, Hash/binary>>.
+
+change_password_hash(Username, PasswordHash) ->
+ update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash }
+ end).
salted_md5(Salt, Cleartext) ->
Salted = <<Salt/binary, Cleartext/binary>>,
erlang:md5(Salted).
set_tags(Username, Tags) ->
- rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]),
- R = update_user(Username, fun(User) ->
- User#internal_user{tags = Tags}
- end),
- R.
-
-update_user(Username, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- {ok, User} = lookup_user(Username),
- ok = mnesia:write(rabbit_user, Fun(User), write)
- end)).
-
-list_users() ->
- [[{user, Username}, {tags, Tags}] ||
- #internal_user{username = Username, tags = Tags} <-
- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
-
-user_info_keys() -> ?USER_INFO_KEYS.
-
-lookup_user(Username) ->
- rabbit_misc:dirty_read({rabbit_user, Username}).
-
-validate_regexp(RegexpBin) ->
- Regexp = binary_to_list(RegexpBin),
- case re:compile(Regexp) of
- {ok, _} -> ok;
- {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
- end.
+ rabbit_log:info("Setting user tags for user '~s' to ~p~n",
+ [Username, Tags]),
+ update_user(Username, fun(User) ->
+ User#internal_user{tags = Tags}
+ end).
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n",
+ rabbit_log:info("Setting permissions for "
+ "'~s' in '~s' to '~s', '~s', '~s'~n",
[Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
- lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ lists:map(
+ fun (RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp,
+ Regexp, Reason}})
+ end
+ end, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -269,7 +244,6 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
write)
end)).
-
clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
@@ -280,32 +254,36 @@ clear_permissions(Username, VHostPath) ->
virtual_host = VHostPath}})
end)).
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+%%----------------------------------------------------------------------------
+%% Listing
+
+-define(PERMS_INFO_KEYS, [configure, write, read]).
+-define(USER_INFO_KEYS, [user, tags]).
+
+user_info_keys() -> ?USER_INFO_KEYS.
+
perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS].
vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS].
user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS].
user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS.
+list_users() ->
+ [[{user, Username}, {tags, Tags}] ||
+ #internal_user{username = Username, tags = Tags} <-
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
list_permissions() ->
list_permissions(perms_info_keys(), match_user_vhost('_', '_')).
-list_vhost_permissions(VHostPath) ->
- list_permissions(
- vhost_perms_info_keys(),
- rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
-
-list_user_permissions(Username) ->
- list_permissions(
- user_perms_info_keys(),
- rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
-
-list_user_vhost_permissions(Username, VHostPath) ->
- list_permissions(
- user_vhost_perms_info_keys(),
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath, match_user_vhost(Username, VHostPath))).
-
-filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
-
list_permissions(Keys, QueryThunk) ->
[filter_props(Keys, [{user, Username},
{vhost, VHostPath},
@@ -320,6 +298,24 @@ list_permissions(Keys, QueryThunk) ->
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
+
+list_user_permissions(Username) ->
+ list_permissions(
+ user_perms_info_keys(),
+ rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
+
+list_vhost_permissions(VHostPath) ->
+ list_permissions(
+ vhost_perms_info_keys(),
+ rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ list_permissions(
+ user_vhost_perms_info_keys(),
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath, match_user_vhost(Username, VHostPath))).
+
match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
rabbit_user_permission,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 11e6bd38..bb9c61a8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -169,7 +169,7 @@ add(Binding, InnerFun) ->
ok ->
case mnesia:read({rabbit_route, B}) of
[] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
+ [_] -> fun () -> ok end
end;
{error, _} = Err ->
rabbit_misc:const(Err)
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8157e82d..7907c96c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -497,15 +497,14 @@ check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
+check_user_id_header(
+ #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) ->
+ ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual,
- tags = Tags}}) ->
- case lists:member(impersonator, Tags) of
- true -> ok;
- false -> precondition_failed(
- "user_id property set to '~s' but authenticated user was "
- "'~s'", [Claimed, Actual])
- end.
+ #ch{user = #user{username = Actual}}) ->
+ precondition_failed(
+ "user_id property set to '~s' but authenticated user was '~s'",
+ [Claimed, Actual]).
check_expiration_header(Props) ->
case rabbit_basic:parse_expiration(Props) of
@@ -1440,8 +1439,9 @@ notify_limiter(Limiter, Acked) ->
end
end.
-deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- mandatory = false},
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ confirm = false,
+ mandatory = false},
[]}, State) -> %% optimisation
?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
State;
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 2bd22579..49f7e388 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -51,8 +51,11 @@ behaviour_info(_Other) ->
%%----------------------------------------------------------------------------
-intercept_method(#'basic.publish'{} = M, _VHost) ->
- M;
+intercept_method(#'basic.publish'{} = M, _VHost) -> M;
+intercept_method(#'basic.ack'{} = M, _VHost) -> M;
+intercept_method(#'basic.nack'{} = M, _VHost) -> M;
+intercept_method(#'basic.reject'{} = M, _VHost) -> M;
+intercept_method(#'basic.credit'{} = M, _VHost) -> M;
intercept_method(M, VHost) ->
intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))).
@@ -87,5 +90,7 @@ select(Method) ->
validate_method(M, M2) ->
rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+%% keep dialyzer happy
+-spec internal_error(string(), [any()]) -> no_return().
internal_error(Format, Args) ->
- rabbit_misc:protocol_error(internal_error, Format, Args). \ No newline at end of file
+ rabbit_misc:protocol_error(internal_error, Format, Args).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f3463286..746f2bdb 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -90,6 +90,7 @@
status,
environment,
report,
+ set_cluster_name,
eval,
close_connection,
@@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
ok;
+action(set_cluster_name, Node, [Name], _Opts, Inform) ->
+ Inform("Setting cluster name to ~s", [Name]),
+ rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]);
+
action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index e6e4b4cc..c372d5f1 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,7 +31,7 @@
-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() |
+-spec(connect/5 :: (('nouser' |
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
@@ -67,37 +67,35 @@ list() ->
%%----------------------------------------------------------------------------
-connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User, rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = access_refused} ->
- {error, access_refused}
- end;
-
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
connect0(fun () -> rabbit_access_control:check_user_pass_login(
Username, Password) end,
VHost, Protocol, Pid, Infos);
-connect(Username, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(
- Username, []) end,
+connect(nouser, VHost, Protocol, Pid, Infos) ->
+ connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
VHost, Protocol, Pid, Infos).
connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true -> case AuthFun() of
{ok, User} ->
- connect(User, VHost, Protocol, Pid, Infos);
+ connect1(User, VHost, Protocol, Pid, Infos);
{refused, _M, _A} ->
{error, {auth_failure, "Refused"}}
end;
false -> {error, broker_not_found_on_node}
end.
+connect1(User, VHost, Protocol, Pid, Infos) ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end.
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8ba29deb..27b8d1e6 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) ->
[begin
Path = [{FinalNode, _} | _] =
follow_down_get_path(X, split_topic_key(K)),
- trie_remove_binding(X, FinalNode, D),
+ trie_remove_binding(X, FinalNode, D, Args),
remove_path_if_empty(X, Path)
- end || #binding{source = X, key = K, destination = D} <- Bs],
+ end || #binding{source = X, key = K, destination = D, args = Args} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
@@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) ->
%%----------------------------------------------------------------------------
-internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+internal_add_binding(#binding{source = X, key = K, destination = D,
+ args = Args}) ->
FinalNode = follow_down_create(X, split_topic_key(K)),
- trie_add_binding(X, FinalNode, D),
+ trie_add_binding(X, FinalNode, D, Args),
ok.
trie_match(X, Words) ->
@@ -176,7 +177,8 @@ trie_bindings(X, Node) ->
MatchHead = #topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = '$1'}},
+ destination = '$1',
+ arguments = '_'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
trie_update_node_counts(X, Node, Field, Delta) ->
@@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
node_id = ToNode},
write).
-trie_add_binding(X, Node, D) ->
+trie_add_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
- trie_binding_op(X, Node, D, fun mnesia:write/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:write/3).
-trie_remove_binding(X, Node, D) ->
+trie_remove_binding(X, Node, D, Args) ->
trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
- trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+ trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
-trie_binding_op(X, Node, D, Op) ->
+trie_binding_op(X, Node, D, Args, Op) ->
ok = Op(rabbit_topic_trie_binding,
#topic_trie_binding{
trie_binding = #trie_binding{exchange_name = X,
node_id = Node,
- destination = D}},
+ destination = D,
+ arguments = Args}},
write).
trie_remove_all_nodes(X) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ca495733..4f77009c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) ->
ok.
drop_mirror(QName, MirrorNode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- {error, {queue_not_mirrored_on_node, MirrorNode}};
- [QPid] when SPids =:= [] ->
- {error, cannot_drop_only_mirror};
- [Pid] ->
- rabbit_log:info(
- "Dropping queue mirror on node ~p for ~s~n",
- [MirrorNode, rabbit_misc:rs(Name)]),
- exit(Pid, {shutdown, dropped}),
- {ok, dropped}
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid] when SPids =:= [] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info(
+ "Dropping queue mirror on node ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ {ok, dropped}
+ end;
+ {error, not_found} = E ->
+ E
+ end.
add_mirrors(QName, Nodes, SyncMode) ->
[add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode, SyncMode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- start_child(Name, MirrorNode, Q, SyncMode);
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q, SyncMode)
- end
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q, SyncMode);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
+ end
+ end;
+ {error, not_found} = E ->
+ E
+ end.
start_child(Name, MirrorNode, Q, SyncMode) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(down),
- fun () ->
- rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
- end) of
- {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode);
- _ -> ok
- end.
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun () ->
+ {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1c63980e..848c4a87 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -53,13 +53,12 @@
-export([parse_arguments/3]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
--export([const_ok/0, const/1]).
+-export([const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
-export([pget/2, pget/3, pget_or_die/2, pset/3]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
--export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
-export([version/0, which_applications/0]).
@@ -219,7 +218,6 @@
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
--spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
@@ -230,8 +228,6 @@
-spec(pset/3 :: (term(), term(), [term()]) -> term()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
--spec(multi_call/2 ::
- ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-spec(version/0 :: () -> string()).
@@ -891,7 +887,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
{error, Reason}
end.
-const_ok() -> ok.
const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
@@ -950,31 +945,6 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
_ -> Res
end || Res <- ResL]).
-%% A simplified version of gen_server:multi_call/2 with a sane
-%% API. This is not in gen_server2 as there is no useful
-%% infrastructure there to share.
-multi_call(Pids, Req) ->
- MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
- receive_multi_call(MonitorPids, [], []).
-
-start_multi_call(Pid, Req) when is_pid(Pid) ->
- Mref = erlang:monitor(process, Pid),
- Pid ! {'$gen_call', {self(), Mref}, Req},
- {Mref, Pid}.
-
-receive_multi_call([], Good, Bad) ->
- {lists:reverse(Good), lists:reverse(Bad)};
-receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
- receive
- {Mref, Reply} ->
- erlang:demonitor(Mref, [flush]),
- receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
- {'DOWN', Mref, _, _, noconnection} ->
- receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
- {'DOWN', Mref, _, _, Reason} ->
- receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
- end.
-
os_cmd(Command) ->
case os:type() of
{win32, _} ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f27f77c6..59873ffc 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -327,6 +327,7 @@ status() ->
case is_running() of
true -> RunningNodes = cluster_nodes(running),
[{running_nodes, RunningNodes},
+ {cluster_name, rabbit_nodes:cluster_name()},
{partitions, mnesia_partitions(RunningNodes)}];
false -> []
end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 5a1613a7..c5aa8473 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,8 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2, fqdn_nodename/0]).
+ is_running/2, is_process_running/2,
+ cluster_name/0, set_cluster_name/1]).
-include_lib("kernel/include/inet.hrl").
@@ -37,7 +38,8 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
--spec(fqdn_nodename/0 :: () -> binary()).
+-spec(cluster_name/0 :: () -> binary()).
+-spec(set_cluster_name/1 :: (binary()) -> 'ok').
-endif.
@@ -111,8 +113,15 @@ is_process_running(Node, Process) ->
P when is_pid(P) -> true
end.
-fqdn_nodename() ->
+cluster_name() ->
+ rabbit_runtime_parameters:value_global(
+ cluster_name, cluster_name_default()).
+
+cluster_name_default() ->
{ID, _} = rabbit_nodes:parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
+
+set_cluster_name(Name) ->
+ rabbit_runtime_parameters:set_global(cluster_name, Name).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 833c6e27..9ffcd203 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -156,19 +156,23 @@ server_properties(Protocol) ->
[case X of
{KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
longstr,
- list_to_binary(Value)};
+ maybe_list_to_binary(Value)};
{BinKey, Type, Value} -> {BinKey, Type, Value}
end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]]],
+ [{product, Product},
+ {version, Version},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favour of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+maybe_list_to_binary(V) when is_binary(V) -> V;
+maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
+
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
@@ -958,6 +962,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) ->
ok
end.
+%% keep dialyzer happy
+-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
+ no_return().
fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
{S1, S2} = case MinOrMax of
min -> {lower, minimum};
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index bcde0078..18b9fbb8 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -22,6 +22,8 @@
list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
+-export([set_global/2, value_global/1, value_global/2]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -34,6 +36,7 @@
-> ok_or_error_string()).
-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
@@ -48,6 +51,8 @@
-> rabbit_types:infos() | 'not_found').
-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
+-spec(value_global/1 :: (atom()) -> term() | 'not_found').
+-spec(value_global/2 :: (atom(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) ->
set(VHost, Component, Name, Term) ->
set_any(VHost, Component, Name, Term).
+set_global(Name, Term) ->
+ mnesia_update(Name, Term),
+ ok.
+
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
@@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) ->
E
end.
+mnesia_update(Key, Term) ->
+ rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)).
+
mnesia_update(VHost, Comp, Name, Term) ->
- F = fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of
- [] -> new;
- [Params] -> {old, Params#runtime_parameters.value}
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))).
+
+mnesia_update_fun(Key, Term) ->
+ fun () ->
+ Res = case mnesia:read(?TABLE, Key, read) of
+ [] -> new;
+ [Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write),
- Res
- end,
- rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)).
+ ok = mnesia:write(?TABLE, c(Key, Term), write),
+ Res
+ end.
clear(_, <<"policy">> , _) ->
{error_string, "policies may not be cleared using this method"};
@@ -159,43 +174,46 @@ list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
lookup(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+ case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Name) ->
- case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
+value(VHost, Comp, Name) -> value0({VHost, Comp, Name}).
+value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def).
+
+value_global(Key) -> value0(Key).
+value_global(Key, Default) -> value0(Key, Default).
+
+value0(Key) ->
+ case lookup0(Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Name, Default) ->
- Params = lookup0(VHost, Component, Name,
- fun () ->
- lookup_missing(VHost, Component, Name, Default)
- end),
+value0(Key, Default) ->
+ Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Name, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
+lookup0(Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, Key) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Name, Default) ->
+lookup_missing(Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
- [] -> Record = c(VHost, Component, Name, Default),
+ case mnesia:read(?TABLE, Key, read) of
+ [] -> Record = c(Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Name, Default) ->
- #runtime_parameters{key = {VHost, Component, Name},
+c(Key, Default) ->
+ #runtime_parameters{key = Key,
value = Default}.
p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2d6ff73b..33c6354b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -39,7 +39,6 @@ all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
passed = test_version_equivalance(),
- passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_rabbit_basic_header_handling(),
@@ -66,6 +65,7 @@ all_tests() ->
passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed = test_with_state(),
+ passed = test_mcall(),
passed =
do_if_secondary_node(
fun run_cluster_dependent_tests/1,
@@ -156,26 +156,6 @@ test_version_equivalance() ->
false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
passed.
-test_multi_call() ->
- Fun = fun() ->
- receive
- {'$gen_call', {From, Mref}, request} ->
- From ! {Mref, response}
- end,
- receive
- never -> ok
- end
- end,
- Pid1 = spawn(Fun),
- Pid2 = spawn(Fun),
- Pid3 = spawn(Fun),
- exit(Pid2, bang),
- {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} =
- rabbit_misc:multi_call([Pid1, Pid2, Pid3], request),
- exit(Pid1, bang),
- exit(Pid3, bang),
- passed.
-
test_rabbit_basic_header_handling() ->
passed = write_table_with_invalid_existing_type_test(),
passed = invalid_existing_headers_test(),
@@ -578,33 +558,38 @@ test_topic_matching() ->
key = list_to_binary(Key),
destination = #resource{virtual_host = <<"/">>,
kind = queue,
- name = list_to_binary(Q)}} ||
- {Key, Q} <- [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]],
+ name = list_to_binary(Q)},
+ args = Args} ||
+ {Key, Q, Args} <- [{"a.b.c", "t1", []},
+ {"a.*.c", "t2", []},
+ {"a.#.b", "t3", []},
+ {"a.b.b.c", "t4", []},
+ {"#", "t5", []},
+ {"#.#", "t6", []},
+ {"#.b", "t7", []},
+ {"*.*", "t8", []},
+ {"a.*", "t9", []},
+ {"*.b.c", "t10", []},
+ {"a.#", "t11", []},
+ {"a.#.#", "t12", []},
+ {"b.b.c", "t13", []},
+ {"a.b.b", "t14", []},
+ {"a.b", "t15", []},
+ {"b.c", "t16", []},
+ {"", "t17", []},
+ {"*.*.*", "t18", []},
+ {"vodka.martini", "t19", []},
+ {"a.b.c", "t20", []},
+ {"*.#", "t21", []},
+ {"#.*.#", "t22", []},
+ {"*.#.#", "t23", []},
+ {"#.#.#", "t24", []},
+ {"*", "t25", []},
+ {"#.b.#", "t26", []},
+ {"args-test", "t27",
+ [{<<"foo">>, longstr, <<"bar">>}]},
+ {"args-test", "t27", %% Note aliasing
+ [{<<"foo">>, longstr, <<"baz">>}]}]],
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
@@ -631,12 +616,13 @@ test_topic_matching() ->
"t22", "t23", "t24", "t26"]},
{"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
{"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
- "t25"]}]),
-
+ "t25"]},
+ {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25", "t27"]}]),
%% remove some bindings
RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
lists:nth(11, Bindings), lists:nth(19, Bindings),
- lists:nth(21, Bindings)],
+ lists:nth(21, Bindings), lists:nth(28, Bindings)],
exchange_op_callback(X, remove_bindings, [RemovedBindings]),
RemainingBindings = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Bindings),
@@ -659,7 +645,8 @@ test_topic_matching() ->
{"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
"t24", "t26"]},
{"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]},
+ {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -1042,6 +1029,9 @@ test_user_management() ->
ok = control_action(add_vhost, ["/testhost"]),
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
[{"-p", "/testhost"}]),
+ {new, _} = rabbit_amqqueue:declare(
+ rabbit_misc:r(<<"/testhost">>, queue, <<"test">>),
+ true, false, [], none),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -1368,6 +1358,82 @@ test_with_state() ->
fun (S) -> element(1, S) end),
passed.
+test_mcall() ->
+ P1 = spawn(fun gs2_test_listener/0),
+ register(foo, P1),
+ global:register_name(gfoo, P1),
+
+ P2 = spawn(fun() -> exit(bang) end),
+ %% ensure P2 is dead (ignore the race setting up the monitor)
+ await_exit(P2),
+
+ P3 = spawn(fun gs2_test_crasher/0),
+
+ %% since P2 crashes almost immediately and P3 after receiving its first
+ %% message, we have to spawn a few more processes to handle the additional
+ %% cases we're interested in here
+ register(baz, spawn(fun gs2_test_crasher/0)),
+ register(bog, spawn(fun gs2_test_crasher/0)),
+ global:register_name(gbaz, spawn(fun gs2_test_crasher/0)),
+
+ NoNode = rabbit_nodes:make("nonode"),
+
+ Targets =
+ %% pids
+ [P1, P2, P3]
+ ++
+ %% registered names
+ [foo, bar, baz]
+ ++
+ %% {Name, Node} pairs
+ [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}]
+ ++
+ %% {global, Name}
+ [{global, gfoo}, {global, gbar}, {global, gbaz}],
+
+ GoodResults = [{D, goodbye} || D <- [P1, foo,
+ {foo, node()},
+ {global, gfoo}]],
+
+ BadResults = [{P2, noproc}, % died before use
+ {P3, boom}, % died on first use
+ {bar, noproc}, % never registered
+ {baz, boom}, % died on first use
+ {{bar, node()}, noproc}, % never registered
+ {{bog, node()}, boom}, % died on first use
+ {{foo, NoNode}, nodedown}, % invalid node
+ {{global, gbar}, noproc}, % never registered globally
+ {{global, gbaz}, boom}], % died on first use
+
+ {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]),
+ true = lists:sort(Replies) == lists:sort(GoodResults),
+ true = lists:sort(Errors) == lists:sort(BadResults),
+
+ %% cleanup (ignore the race setting up the monitor)
+ P1 ! stop,
+ await_exit(P1),
+ passed.
+
+await_exit(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MRef, _, _, _} -> ok
+ end.
+
+gs2_test_crasher() ->
+ receive
+ {'$gen_call', _From, hello} -> exit(boom)
+ end.
+
+gs2_test_listener() ->
+ receive
+ {'$gen_call', From, hello} ->
+ gen_server2:reply(From, goodbye),
+ gs2_test_listener();
+ stop ->
+ ok
+ end.
+
test_statistics_event_receiver(Pid) ->
receive
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 90372461..4cb3cacc 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -47,6 +47,7 @@
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
+-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
%% -------------------------------------------------------------------
@@ -355,6 +356,32 @@ internal_system_x() ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+cluster_name() ->
+ {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0),
+ ok.
+
+cluster_name_tx() ->
+ %% mnesia:transform_table/4 does not let us delete records
+ T = rabbit_runtime_parameters,
+ mnesia:write_lock_table(T),
+ Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K
+ <- mnesia:all_keys(T)],
+ case Ks of
+ [] -> ok;
+ [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write),
+ R = {runtime_parameters, cluster_name, Name},
+ mnesia:write(T, R, write),
+ case Tl of
+ [] -> ok;
+ _ -> {VHost, _, _} = K,
+ error_logger:warning_msg(
+ "Multiple local-nodenames found, picking '~s' "
+ "from '~s' for cluster name~n", [Name, VHost])
+ end
+ end,
+ [mnesia:delete(T, K, write) || K <- Ks],
+ ok.
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 020b5b33..321af4ac 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -643,6 +643,31 @@ drop(AckRequired, State) ->
ack([], State) ->
{[], State};
+%% optimisation: this head is essentially a partial evaluation of the
+%% general case below, for the single-ack case.
+ack([SeqId], State) ->
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
+ remove_pending_ack(SeqId, State),
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgOnDisk of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ PCount1 = PCount - one_if(IsPersistent),
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
@@ -730,20 +755,20 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
ack_out_counter = AckOutCount,
- rates = #rates{ in = InRate,
- out = OutRate,
- ack_in = AckInRate,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
ack_out = AckOutRate,
timestamp = TS }}) ->
Now = erlang:now(),
- Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
- out = update_rate(Now, TS, OutCount, OutRate),
- ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
timestamp = Now },
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 047bce77..9fa4da44 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -83,9 +83,9 @@ delete(VHostPath) ->
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
- [{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
+ [assert_benign(rabbit_amqqueue:delete(Q, false, false)) ||
Q <- rabbit_amqqueue:list(VHostPath)],
- [ok = rabbit_exchange:delete(Name, false) ||
+ [assert_benign(rabbit_exchange:delete(Name, false)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
with(VHostPath, fun () ->
@@ -94,6 +94,18 @@ delete(VHostPath) ->
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
+assert_benign(ok) -> ok;
+assert_benign({ok, _}) -> ok;
+assert_benign({error, not_found}) -> ok;
+assert_benign({error, {absent, Q}}) ->
+ %% We have a durable queue on a down node. Removing the mnesia
+ %% entries here is safe. If/when the down node restarts, it will
+ %% clear out the on-disk storage of the queue.
+ case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of
+ ok -> ok;
+ {error, not_found} -> ok
+ end.
+
internal_delete(VHostPath) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
proplists:get_value(user, Info), VHostPath)