summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-07 13:16:28 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-07 13:16:28 +0000
commit30d56f369d7c6d2718c414d0d83727baedb1acee (patch)
tree0ddcf4922a77af53766dae36202da6ffbf3aefce
parent9b22b0165ae290af06144e438dff00a1c1fb860e (diff)
parenta49aa42ec838915e7f36d9df6ebeea394f94604c (diff)
downloadrabbitmq-server-30d56f369d7c6d2718c414d0d83727baedb1acee.tar.gz
Merge bug25985
-rw-r--r--src/gen_server2.erl88
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_auth_backend_internal.erl268
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl79
-rw-r--r--src/rabbit_misc.erl32
-rw-r--r--src/rabbit_tests.erl101
-rw-r--r--src/rabbit_variable_queue.erl18
-rw-r--r--src/rabbit_vhost.erl16
10 files changed, 376 insertions, 250 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6690d181..ee82bcb3 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -81,6 +81,14 @@
%% process as sys:get_status/1 would). Pass through a function which
%% can be invoked on the state, get back the result. The state is not
%% modified.
+%%
+%% 10) an mcall/1 function has been added for performing multiple
+%% call/3 in parallel. Unlike multi_call, which sends the same request
+%% to same-named processes residing on a supplied list of nodes, it
+%% operates on name/request pairs, where name is anything accepted by
+%% call/3, i.e. a pid, global name, local name, or local name on a
+%% particular node.
+%%
%% All modifications are (C) 2009-2013 GoPivotal, Inc.
@@ -190,6 +198,7 @@
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
+ mcall/1,
with_state/2,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
@@ -389,6 +398,85 @@ multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
+%%% -----------------------------------------------------------------
+%%% Make multiple calls to multiple servers, given pairs of servers
+%%% and messages.
+%%% Returns: {[{Dest, Reply}], [{Dest, Error}]}
+%%%
+%%% Dest can be pid() | RegName :: atom() |
+%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()}
+%%%
+%%% A middleman process is used to avoid clogging up the callers
+%%% message queue.
+%%% -----------------------------------------------------------------
+mcall(CallSpecs) ->
+ Tag = make_ref(),
+ {_, MRef} = spawn_monitor(
+ fun() ->
+ Refs = lists:foldl(
+ fun ({Dest, _Request}=S, Dict) ->
+ dict:store(do_mcall(S), Dest, Dict)
+ end, dict:new(), CallSpecs),
+ collect_replies(Tag, Refs, [], [])
+ end),
+ receive
+ {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
+ {'DOWN', MRef, _, _, Reason} -> exit(Reason)
+ end.
+
+do_mcall({{global,Name}=Dest, Request}) ->
+ %% whereis_name is simply an ets lookup, and is precisely what
+ %% global:send/2 does, yet we need a Ref to put in the call to the
+ %% server, so invoking whereis_name makes a lot more sense here.
+ case global:whereis_name(Name) of
+ Pid when is_pid(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ catch msend(Pid, MRef, Request),
+ MRef;
+ undefined ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, Dest, noproc},
+ Ref
+ end;
+do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) ->
+ {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6
+ catch msend(Dest, MRef, Request),
+ MRef;
+do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) ->
+ MRef = erlang:monitor(process, Dest),
+ catch msend(Dest, MRef, Request),
+ MRef.
+
+msend(Dest, MRef, Request) ->
+ erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]).
+
+collect_replies(Tag, Refs, Replies, Errors) ->
+ case dict:size(Refs) of
+ 0 -> exit({Tag, {Replies, Errors}});
+ _ -> receive
+ {MRef, Reply} ->
+ {Refs1, Replies1} = handle_call_result(MRef, Reply,
+ Refs, Replies),
+ collect_replies(Tag, Refs1, Replies1, Errors);
+ {'DOWN', MRef, _, _, Reason} ->
+ Reason1 = case Reason of
+ noconnection -> nodedown;
+ _ -> Reason
+ end,
+ {Refs1, Errors1} = handle_call_result(MRef, Reason1,
+ Refs, Errors),
+ collect_replies(Tag, Refs1, Replies, Errors1)
+ end
+ end.
+
+handle_call_result(MRef, Result, Refs, AccList) ->
+ %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2}
+ %% here, so we must cope with MRefs that we've already seen and erased
+ case dict:find(MRef, Refs) of
+ {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]};
+ _ -> {Refs, AccList}
+ end.
+
%% -----------------------------------------------------------------
%% Apply a function to a generic server's state.
%% -----------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eeb0e0bf..2b86435d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -354,14 +354,14 @@ with(Name, F, E) ->
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
- %% with the QPid.
+ %% with the QPid. F() should be written s.t. that this
+ %% cannot happen, so we bail if it does since that
+ %% indicates a code bug and we don't want to get stuck in
+ %% the retry loop.
rabbit_misc:with_exit_handler(
- fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E(not_found_or_absent_dirty(Name));
- false -> timer:sleep(25),
- with(Name, F, E)
- end
+ fun () -> false = rabbit_misc:is_process_alive(QPid),
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -506,8 +506,8 @@ force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]).
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- {_, Bad} = rabbit_misc:multi_call(
- [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ {_, Bad} = gen_server2:mcall(
+ [{Q#amqqueue.pid, force_event_refresh} || Q <- Qs]),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
lists:member(Pid, FailedPids)],
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 32b1a2e0..66b57ce8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,8 +20,8 @@
-behaviour(gen_server2).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(SYNC_INTERVAL, 200). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 61919d05..ebeac1f7 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -22,15 +22,18 @@
-export([description/0]).
-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, set_tags/2,
- list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]).
--export([make_salt/0, check_password/2, change_password_hash/2,
- hash_password/1]).
--export([set_permissions/5, clear_permissions/2,
- list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
- list_user_vhost_permissions/2, perms_info_keys/0,
- vhost_perms_info_keys/0, user_perms_info_keys/0,
- user_vhost_perms_info_keys/0]).
+-export([add_user/2, delete_user/1, lookup_user/1,
+ change_password/2, clear_password/1,
+ hash_password/1, change_password_hash/2,
+ set_tags/2, set_permissions/5, clear_permissions/2]).
+-export([user_info_keys/0, perms_info_keys/0,
+ user_perms_info_keys/0, vhost_perms_info_keys/0,
+ user_vhost_perms_info_keys/0,
+ list_users/0, list_permissions/0,
+ list_user_permissions/1, list_vhost_permissions/1,
+ list_user_vhost_permissions/2]).
+
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -38,45 +41,39 @@
-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok').
-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok').
+-spec(lookup_user/1 :: (rabbit_types:username())
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password())
-> 'ok').
-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok').
--spec(make_salt/0 :: () -> binary()).
--spec(check_password/2 :: (rabbit_types:password(),
- rabbit_types:password_hash()) -> boolean()).
--spec(change_password_hash/2 :: (rabbit_types:username(),
- rabbit_types:password_hash()) -> 'ok').
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
+-spec(change_password_hash/2 :: (rabbit_types:username(),
+ rabbit_types:password_hash()) -> 'ok').
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> [rabbit_types:infos()]).
--spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(lookup_user/1 :: (rabbit_types:username())
- -> rabbit_types:ok(rabbit_types:internal_user())
- | rabbit_types:error('not_found')).
-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
+-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
--spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
(rabbit_types:username()) -> [rabbit_types:infos()]).
+-spec(list_vhost_permissions/1 ::
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
-> [rabbit_types:infos()]).
--spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
+
-endif.
%%----------------------------------------------------------------------------
-
--define(PERMS_INFO_KEYS, [configure, write, read]).
--define(USER_INFO_KEYS, [user, tags]).
-
%% Implementation of rabbit_auth_backend
description() ->
@@ -85,11 +82,14 @@ description() ->
check_user_login(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
-check_user_login(Username, [{password, Password}]) ->
+check_user_login(Username, [{password, Cleartext}]) ->
internal_check_user_login(
- Username, fun(#internal_user{password_hash = Hash}) ->
- check_password(Password, Hash)
- end);
+ Username,
+ fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) ->
+ Hash =:= salted_md5(Salt, Cleartext);
+ (#internal_user{}) ->
+ false
+ end);
check_user_login(Username, AuthProps) ->
exit({unknown_auth_props, Username, AuthProps}).
@@ -145,42 +145,43 @@ permission_index(read) -> #permission.read.
add_user(Username, Password) ->
rabbit_log:info("Creating user '~s'~n", [Username]),
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(
- rabbit_user,
- #internal_user{username = Username,
- password_hash =
- hash_password(Password),
- tags = []},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end),
- R.
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ tags = []},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end).
delete_user(Username) ->
rabbit_log:info("Deleting user '~s'~n", [Username]),
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)),
- R.
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)).
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
change_password(Username, Password) ->
rabbit_log:info("Changing password for '~s'~n", [Username]),
@@ -190,70 +191,44 @@ clear_password(Username) ->
rabbit_log:info("Clearing password for '~s'~n", [Username]),
change_password_hash(Username, <<"">>).
-change_password_hash(Username, PasswordHash) ->
- R = update_user(Username, fun(User) ->
- User#internal_user{
- password_hash = PasswordHash }
- end),
- R.
-
hash_password(Cleartext) ->
- Salt = make_salt(),
- Hash = salted_md5(Salt, Cleartext),
- <<Salt/binary, Hash/binary>>.
-
-check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext);
-check_password(_Cleartext, _Any) ->
- false.
-
-make_salt() ->
{A1,A2,A3} = now(),
random:seed(A1, A2, A3),
Salt = random:uniform(16#ffffffff),
- <<Salt:32>>.
+ SaltBin = <<Salt:32>>,
+ Hash = salted_md5(SaltBin, Cleartext),
+ <<SaltBin/binary, Hash/binary>>.
+
+change_password_hash(Username, PasswordHash) ->
+ update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash }
+ end).
salted_md5(Salt, Cleartext) ->
Salted = <<Salt/binary, Cleartext/binary>>,
erlang:md5(Salted).
set_tags(Username, Tags) ->
- rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]),
- R = update_user(Username, fun(User) ->
- User#internal_user{tags = Tags}
- end),
- R.
-
-update_user(Username, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- {ok, User} = lookup_user(Username),
- ok = mnesia:write(rabbit_user, Fun(User), write)
- end)).
-
-list_users() ->
- [[{user, Username}, {tags, Tags}] ||
- #internal_user{username = Username, tags = Tags} <-
- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
-
-user_info_keys() -> ?USER_INFO_KEYS.
-
-lookup_user(Username) ->
- rabbit_misc:dirty_read({rabbit_user, Username}).
-
-validate_regexp(RegexpBin) ->
- Regexp = binary_to_list(RegexpBin),
- case re:compile(Regexp) of
- {ok, _} -> ok;
- {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
- end.
+ rabbit_log:info("Setting user tags for user '~s' to ~p~n",
+ [Username, Tags]),
+ update_user(Username, fun(User) ->
+ User#internal_user{tags = Tags}
+ end).
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n",
+ rabbit_log:info("Setting permissions for "
+ "'~s' in '~s' to '~s', '~s', '~s'~n",
[Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
- lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ lists:map(
+ fun (RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp,
+ Regexp, Reason}})
+ end
+ end, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -269,7 +244,6 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
write)
end)).
-
clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
@@ -280,32 +254,36 @@ clear_permissions(Username, VHostPath) ->
virtual_host = VHostPath}})
end)).
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+%%----------------------------------------------------------------------------
+%% Listing
+
+-define(PERMS_INFO_KEYS, [configure, write, read]).
+-define(USER_INFO_KEYS, [user, tags]).
+
+user_info_keys() -> ?USER_INFO_KEYS.
+
perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS].
vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS].
user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS].
user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS.
+list_users() ->
+ [[{user, Username}, {tags, Tags}] ||
+ #internal_user{username = Username, tags = Tags} <-
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
list_permissions() ->
list_permissions(perms_info_keys(), match_user_vhost('_', '_')).
-list_vhost_permissions(VHostPath) ->
- list_permissions(
- vhost_perms_info_keys(),
- rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
-
-list_user_permissions(Username) ->
- list_permissions(
- user_perms_info_keys(),
- rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
-
-list_user_vhost_permissions(Username, VHostPath) ->
- list_permissions(
- user_vhost_perms_info_keys(),
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath, match_user_vhost(Username, VHostPath))).
-
-filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
-
list_permissions(Keys, QueryThunk) ->
[filter_props(Keys, [{user, Username},
{vhost, VHostPath},
@@ -320,6 +298,24 @@ list_permissions(Keys, QueryThunk) ->
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
+
+list_user_permissions(Username) ->
+ list_permissions(
+ user_perms_info_keys(),
+ rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
+
+list_vhost_permissions(VHostPath) ->
+ list_permissions(
+ vhost_perms_info_keys(),
+ rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ list_permissions(
+ user_vhost_perms_info_keys(),
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath, match_user_vhost(Username, VHostPath))).
+
match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
rabbit_user_permission,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 11e6bd38..bb9c61a8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -169,7 +169,7 @@ add(Binding, InnerFun) ->
ok ->
case mnesia:read({rabbit_route, B}) of
[] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
+ [_] -> fun () -> ok end
end;
{error, _} = Err ->
rabbit_misc:const(Err)
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ca495733..4f77009c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) ->
ok.
drop_mirror(QName, MirrorNode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- {error, {queue_not_mirrored_on_node, MirrorNode}};
- [QPid] when SPids =:= [] ->
- {error, cannot_drop_only_mirror};
- [Pid] ->
- rabbit_log:info(
- "Dropping queue mirror on node ~p for ~s~n",
- [MirrorNode, rabbit_misc:rs(Name)]),
- exit(Pid, {shutdown, dropped}),
- {ok, dropped}
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid] when SPids =:= [] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info(
+ "Dropping queue mirror on node ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ {ok, dropped}
+ end;
+ {error, not_found} = E ->
+ E
+ end.
add_mirrors(QName, Nodes, SyncMode) ->
[add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode, SyncMode) ->
- rabbit_amqqueue:with(
- QName,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- start_child(Name, MirrorNode, Q, SyncMode);
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q, SyncMode)
- end
- end
- end).
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q, SyncMode);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
+ end
+ end;
+ {error, not_found} = E ->
+ E
+ end.
start_child(Name, MirrorNode, Q, SyncMode) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(down),
- fun () ->
- rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
- end) of
- {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode);
- _ -> ok
- end.
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun () ->
+ {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1c63980e..848c4a87 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -53,13 +53,12 @@
-export([parse_arguments/3]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
--export([const_ok/0, const/1]).
+-export([const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
-export([pget/2, pget/3, pget_or_die/2, pset/3]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
--export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
-export([version/0, which_applications/0]).
@@ -219,7 +218,6 @@
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
--spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
@@ -230,8 +228,6 @@
-spec(pset/3 :: (term(), term(), [term()]) -> term()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
--spec(multi_call/2 ::
- ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-spec(version/0 :: () -> string()).
@@ -891,7 +887,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
{error, Reason}
end.
-const_ok() -> ok.
const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
@@ -950,31 +945,6 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
_ -> Res
end || Res <- ResL]).
-%% A simplified version of gen_server:multi_call/2 with a sane
-%% API. This is not in gen_server2 as there is no useful
-%% infrastructure there to share.
-multi_call(Pids, Req) ->
- MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
- receive_multi_call(MonitorPids, [], []).
-
-start_multi_call(Pid, Req) when is_pid(Pid) ->
- Mref = erlang:monitor(process, Pid),
- Pid ! {'$gen_call', {self(), Mref}, Req},
- {Mref, Pid}.
-
-receive_multi_call([], Good, Bad) ->
- {lists:reverse(Good), lists:reverse(Bad)};
-receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
- receive
- {Mref, Reply} ->
- erlang:demonitor(Mref, [flush]),
- receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
- {'DOWN', Mref, _, _, noconnection} ->
- receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
- {'DOWN', Mref, _, _, Reason} ->
- receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
- end.
-
os_cmd(Command) ->
case os:type() of
{win32, _} ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 74ff2adb..33c6354b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -39,7 +39,6 @@ all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
passed = test_version_equivalance(),
- passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_rabbit_basic_header_handling(),
@@ -66,6 +65,7 @@ all_tests() ->
passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed = test_with_state(),
+ passed = test_mcall(),
passed =
do_if_secondary_node(
fun run_cluster_dependent_tests/1,
@@ -156,26 +156,6 @@ test_version_equivalance() ->
false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
passed.
-test_multi_call() ->
- Fun = fun() ->
- receive
- {'$gen_call', {From, Mref}, request} ->
- From ! {Mref, response}
- end,
- receive
- never -> ok
- end
- end,
- Pid1 = spawn(Fun),
- Pid2 = spawn(Fun),
- Pid3 = spawn(Fun),
- exit(Pid2, bang),
- {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} =
- rabbit_misc:multi_call([Pid1, Pid2, Pid3], request),
- exit(Pid1, bang),
- exit(Pid3, bang),
- passed.
-
test_rabbit_basic_header_handling() ->
passed = write_table_with_invalid_existing_type_test(),
passed = invalid_existing_headers_test(),
@@ -1049,6 +1029,9 @@ test_user_management() ->
ok = control_action(add_vhost, ["/testhost"]),
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
[{"-p", "/testhost"}]),
+ {new, _} = rabbit_amqqueue:declare(
+ rabbit_misc:r(<<"/testhost">>, queue, <<"test">>),
+ true, false, [], none),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -1375,6 +1358,82 @@ test_with_state() ->
fun (S) -> element(1, S) end),
passed.
+test_mcall() ->
+ P1 = spawn(fun gs2_test_listener/0),
+ register(foo, P1),
+ global:register_name(gfoo, P1),
+
+ P2 = spawn(fun() -> exit(bang) end),
+ %% ensure P2 is dead (ignore the race setting up the monitor)
+ await_exit(P2),
+
+ P3 = spawn(fun gs2_test_crasher/0),
+
+ %% since P2 crashes almost immediately and P3 after receiving its first
+ %% message, we have to spawn a few more processes to handle the additional
+ %% cases we're interested in here
+ register(baz, spawn(fun gs2_test_crasher/0)),
+ register(bog, spawn(fun gs2_test_crasher/0)),
+ global:register_name(gbaz, spawn(fun gs2_test_crasher/0)),
+
+ NoNode = rabbit_nodes:make("nonode"),
+
+ Targets =
+ %% pids
+ [P1, P2, P3]
+ ++
+ %% registered names
+ [foo, bar, baz]
+ ++
+ %% {Name, Node} pairs
+ [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}]
+ ++
+ %% {global, Name}
+ [{global, gfoo}, {global, gbar}, {global, gbaz}],
+
+ GoodResults = [{D, goodbye} || D <- [P1, foo,
+ {foo, node()},
+ {global, gfoo}]],
+
+ BadResults = [{P2, noproc}, % died before use
+ {P3, boom}, % died on first use
+ {bar, noproc}, % never registered
+ {baz, boom}, % died on first use
+ {{bar, node()}, noproc}, % never registered
+ {{bog, node()}, boom}, % died on first use
+ {{foo, NoNode}, nodedown}, % invalid node
+ {{global, gbar}, noproc}, % never registered globally
+ {{global, gbaz}, boom}], % died on first use
+
+ {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]),
+ true = lists:sort(Replies) == lists:sort(GoodResults),
+ true = lists:sort(Errors) == lists:sort(BadResults),
+
+ %% cleanup (ignore the race setting up the monitor)
+ P1 ! stop,
+ await_exit(P1),
+ passed.
+
+await_exit(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MRef, _, _, _} -> ok
+ end.
+
+gs2_test_crasher() ->
+ receive
+ {'$gen_call', _From, hello} -> exit(boom)
+ end.
+
+gs2_test_listener() ->
+ receive
+ {'$gen_call', From, hello} ->
+ gen_server2:reply(From, goodbye),
+ gs2_test_listener();
+ stop ->
+ ok
+ end.
+
test_statistics_event_receiver(Pid) ->
receive
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9d242316..321af4ac 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -755,20 +755,20 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
-update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
ack_out_counter = AckOutCount,
- rates = #rates{ in = InRate,
- out = OutRate,
- ack_in = AckInRate,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
ack_out = AckOutRate,
timestamp = TS }}) ->
Now = erlang:now(),
- Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
- out = update_rate(Now, TS, OutCount, OutRate),
- ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
timestamp = Now },
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 047bce77..9fa4da44 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -83,9 +83,9 @@ delete(VHostPath) ->
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
- [{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
+ [assert_benign(rabbit_amqqueue:delete(Q, false, false)) ||
Q <- rabbit_amqqueue:list(VHostPath)],
- [ok = rabbit_exchange:delete(Name, false) ||
+ [assert_benign(rabbit_exchange:delete(Name, false)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
with(VHostPath, fun () ->
@@ -94,6 +94,18 @@ delete(VHostPath) ->
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
+assert_benign(ok) -> ok;
+assert_benign({ok, _}) -> ok;
+assert_benign({error, not_found}) -> ok;
+assert_benign({error, {absent, Q}}) ->
+ %% We have a durable queue on a down node. Removing the mnesia
+ %% entries here is safe. If/when the down node restarts, it will
+ %% clear out the on-disk storage of the queue.
+ case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of
+ ok -> ok;
+ {error, not_found} -> ok
+ end.
+
internal_delete(VHostPath) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
proplists:get_value(user, Info), VHostPath)