diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-10 14:38:34 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-10 14:38:34 +0000 |
commit | ea47f993c30267de8a16180f3c406e773cd79490 (patch) | |
tree | b2e201e9cc17a260673ffbed56c2354bc0c21850 | |
parent | 7f750a915d224aecd8f156be0a9aac2c2a5b4550 (diff) | |
parent | 30d56f369d7c6d2718c414d0d83727baedb1acee (diff) | |
download | rabbitmq-server-ea47f993c30267de8a16180f3c406e773cd79490.tar.gz |
Merge in default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 17 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/gen_server2.erl | 88 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
-rw-r--r-- | src/rabbit_auth_backend_dummy.erl | 49 | ||||
-rw-r--r-- | src/rabbit_auth_backend_internal.erl | 268 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 20 | ||||
-rw-r--r-- | src/rabbit_channel_interceptor.erl | 11 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 5 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 28 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 79 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 32 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 1 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 15 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 64 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 170 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 27 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 43 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 16 |
23 files changed, 682 insertions, 378 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d19acd00..a7e42503 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -502,6 +502,23 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>set_cluster_name</command> <arg choice="req">name</arg></cmdsynopsis></term> + <listitem> + <para> + Sets the cluster name. The cluster name is announced to + clients on connection, and used by the federation and + shovel plugins to record where a message has been. The + cluster name is by default derived from the hostname of + the first node in the cluster, but can be changed. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_cluster_name london</screen> + <para role="example"> + This sets the cluster name to "london". + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 00f7341f..19eef65a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,7 +60,7 @@ -record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). --record(trie_binding, {exchange_name, node_id, destination}). +-record(trie_binding, {exchange_name, node_id, destination, arguments}). -record(listener, {node, protocol, host, ip_address, port}). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 6690d181..ee82bcb3 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -81,6 +81,14 @@ %% process as sys:get_status/1 would). Pass through a function which %% can be invoked on the state, get back the result. The state is not %% modified. +%% +%% 10) an mcall/1 function has been added for performing multiple +%% call/3 in parallel. Unlike multi_call, which sends the same request +%% to same-named processes residing on a supplied list of nodes, it +%% operates on name/request pairs, where name is anything accepted by +%% call/3, i.e. a pid, global name, local name, or local name on a +%% particular node. +%% %% All modifications are (C) 2009-2013 GoPivotal, Inc. @@ -190,6 +198,7 @@ cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, + mcall/1, with_state/2, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). @@ -389,6 +398,85 @@ multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). +%%% ----------------------------------------------------------------- +%%% Make multiple calls to multiple servers, given pairs of servers +%%% and messages. +%%% Returns: {[{Dest, Reply}], [{Dest, Error}]} +%%% +%%% Dest can be pid() | RegName :: atom() | +%%% {Name :: atom(), Node :: atom()} | {global, Name :: atom()} +%%% +%%% A middleman process is used to avoid clogging up the callers +%%% message queue. +%%% ----------------------------------------------------------------- +mcall(CallSpecs) -> + Tag = make_ref(), + {_, MRef} = spawn_monitor( + fun() -> + Refs = lists:foldl( + fun ({Dest, _Request}=S, Dict) -> + dict:store(do_mcall(S), Dest, Dict) + end, dict:new(), CallSpecs), + collect_replies(Tag, Refs, [], []) + end), + receive + {'DOWN', MRef, _, _, {Tag, Result}} -> Result; + {'DOWN', MRef, _, _, Reason} -> exit(Reason) + end. + +do_mcall({{global,Name}=Dest, Request}) -> + %% whereis_name is simply an ets lookup, and is precisely what + %% global:send/2 does, yet we need a Ref to put in the call to the + %% server, so invoking whereis_name makes a lot more sense here. + case global:whereis_name(Name) of + Pid when is_pid(Pid) -> + MRef = erlang:monitor(process, Pid), + catch msend(Pid, MRef, Request), + MRef; + undefined -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, Dest, noproc}, + Ref + end; +do_mcall({{Name,Node}=Dest, Request}) when is_atom(Name), is_atom(Node) -> + {_Node, MRef} = start_monitor(Node, Name), %% NB: we don't handle R6 + catch msend(Dest, MRef, Request), + MRef; +do_mcall({Dest, Request}) when is_atom(Dest); is_pid(Dest) -> + MRef = erlang:monitor(process, Dest), + catch msend(Dest, MRef, Request), + MRef. + +msend(Dest, MRef, Request) -> + erlang:send(Dest, {'$gen_call', {self(), MRef}, Request}, [noconnect]). + +collect_replies(Tag, Refs, Replies, Errors) -> + case dict:size(Refs) of + 0 -> exit({Tag, {Replies, Errors}}); + _ -> receive + {MRef, Reply} -> + {Refs1, Replies1} = handle_call_result(MRef, Reply, + Refs, Replies), + collect_replies(Tag, Refs1, Replies1, Errors); + {'DOWN', MRef, _, _, Reason} -> + Reason1 = case Reason of + noconnection -> nodedown; + _ -> Reason + end, + {Refs1, Errors1} = handle_call_result(MRef, Reason1, + Refs, Errors), + collect_replies(Tag, Refs1, Replies, Errors1) + end + end. + +handle_call_result(MRef, Result, Refs, AccList) -> + %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} + %% here, so we must cope with MRefs that we've already seen and erased + case dict:find(MRef, Refs) of + {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; + _ -> {Refs, AccList} + end. + %% ----------------------------------------------------------------- %% Apply a function to a generic server's state. %% ----------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a7438a3f..c0478579 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -354,14 +354,14 @@ with(Name, F, E) -> {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do - %% with the QPid. + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(not_found_or_absent_dirty(Name)); - false -> timer:sleep(25), - with(Name, F, E) - end + fun () -> false = rabbit_misc:is_process_alive(QPid), + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -507,8 +507,8 @@ force_event_refresh(Ref) -> force_event_refresh(QNames, Ref) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = rabbit_misc:multi_call( - [Q#amqqueue.pid || Q <- Qs], {force_event_refresh, Ref}), + {_, Bad} = gen_server2:mcall( + [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), FailedPids = [Pid || {Pid, _Reason} <- Bad], Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, lists:member(Pid, FailedPids)], diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d453c6d3..a1997376 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,8 +20,8 @@ -behaviour(gen_server2). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(SYNC_INTERVAL, 200). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -328,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -448,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> +send_mandatory(#delivery{mandatory = false}) -> ok; send_mandatory(#delivery{mandatory = true, sender = SenderPid, msg_seq_no = MsgSeqNo}) -> gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). -discard(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case MsgSeqNo of - undefined -> State; - _ -> confirm_messages([MsgId], State) - end, +discard(#delivery{confirm = Confirm, + sender = SenderPid, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -487,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State#q{backing_queue_state = BQS1}}; + {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, - discard(Delivery, State)} + discard(Delivery, BQ, BQS, MTC)} end, qname(State), State#q.consumers) of - {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, - State1#q{consumers = Consumers})}; + State#q{backing_queue_state = BQS1, + msg_id_to_channel = MTC1, + consumers = Consumers})}; {undelivered, ActiveConsumersChanged, Consumers} -> {undelivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -512,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, State), + Props = message_properties(Message, Confirm, State1), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State2 = State1#q{backing_queue_state = BQS1}, case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, @@ -522,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {delivered, State3} -> State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State3); + {undelivered, State3 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS2, + msg_id_to_channel = MTC}} -> + {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), + State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl new file mode 100644 index 00000000..1a3db732 --- /dev/null +++ b/src/rabbit_auth_backend_dummy.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_auth_backend_dummy). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_backend). + +-export([description/0]). +-export([user/0]). +-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). + +-ifdef(use_specs). + +-spec(user/0 :: () -> rabbit_types:user()). + +-endif. + +%% A user to be used by the direct client when permission checks are +%% not needed. This user can do anything AMQPish. +user() -> #user{username = <<"dummy">>, + tags = [], + auth_backend = ?MODULE, + impl = none}. + +%% Implementation of rabbit_auth_backend + +description() -> + [{name, <<"Dummy">>}, + {description, <<"Database for the dummy user">>}]. + +check_user_login(_, _) -> + {refused, "cannot log in conventionally as dummy user", []}. + +check_vhost_access(#user{}, _VHostPath) -> true. +check_resource_access(#user{}, #resource{}, _Permission) -> true. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 61919d05..ebeac1f7 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -22,15 +22,18 @@ -export([description/0]). -export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_tags/2, - list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]). --export([make_salt/0, check_password/2, change_password_hash/2, - hash_password/1]). --export([set_permissions/5, clear_permissions/2, - list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, - list_user_vhost_permissions/2, perms_info_keys/0, - vhost_perms_info_keys/0, user_perms_info_keys/0, - user_vhost_perms_info_keys/0]). +-export([add_user/2, delete_user/1, lookup_user/1, + change_password/2, clear_password/1, + hash_password/1, change_password_hash/2, + set_tags/2, set_permissions/5, clear_permissions/2]). +-export([user_info_keys/0, perms_info_keys/0, + user_perms_info_keys/0, vhost_perms_info_keys/0, + user_vhost_perms_info_keys/0, + list_users/0, list_permissions/0, + list_user_permissions/1, list_vhost_permissions/1, + list_user_vhost_permissions/2]). + +%%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -38,45 +41,39 @@ -spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(delete_user/1 :: (rabbit_types:username()) -> 'ok'). +-spec(lookup_user/1 :: (rabbit_types:username()) + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). -spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). -spec(clear_password/1 :: (rabbit_types:username()) -> 'ok'). --spec(make_salt/0 :: () -> binary()). --spec(check_password/2 :: (rabbit_types:password(), - rabbit_types:password_hash()) -> boolean()). --spec(change_password_hash/2 :: (rabbit_types:username(), - rabbit_types:password_hash()) -> 'ok'). -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). +-spec(change_password_hash/2 :: (rabbit_types:username(), + rabbit_types:password_hash()) -> 'ok'). -spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). --spec(list_users/0 :: () -> [rabbit_types:infos()]). --spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(lookup_user/1 :: (rabbit_types:username()) - -> rabbit_types:ok(rabbit_types:internal_user()) - | rabbit_types:error('not_found')). -spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'). +-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(list_users/0 :: () -> [rabbit_types:infos()]). -spec(list_permissions/0 :: () -> [rabbit_types:infos()]). --spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_permissions/1 :: (rabbit_types:username()) -> [rabbit_types:infos()]). +-spec(list_vhost_permissions/1 :: + (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_vhost_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]). --spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). --spec(user_vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). + -endif. %%---------------------------------------------------------------------------- - --define(PERMS_INFO_KEYS, [configure, write, read]). --define(USER_INFO_KEYS, [user, tags]). - %% Implementation of rabbit_auth_backend description() -> @@ -85,11 +82,14 @@ description() -> check_user_login(Username, []) -> internal_check_user_login(Username, fun(_) -> true end); -check_user_login(Username, [{password, Password}]) -> +check_user_login(Username, [{password, Cleartext}]) -> internal_check_user_login( - Username, fun(#internal_user{password_hash = Hash}) -> - check_password(Password, Hash) - end); + Username, + fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) -> + Hash =:= salted_md5(Salt, Cleartext); + (#internal_user{}) -> + false + end); check_user_login(Username, AuthProps) -> exit({unknown_auth_props, Username, AuthProps}). @@ -145,42 +145,43 @@ permission_index(read) -> #permission.read. add_user(Username, Password) -> rabbit_log:info("Creating user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write( - rabbit_user, - #internal_user{username = Username, - password_hash = - hash_password(Password), - tags = []}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end), - R. + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + tags = []}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end). delete_user(Username) -> rabbit_log:info("Deleting user '~s'~n", [Username]), - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)), - R. + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)). + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). change_password(Username, Password) -> rabbit_log:info("Changing password for '~s'~n", [Username]), @@ -190,70 +191,44 @@ clear_password(Username) -> rabbit_log:info("Clearing password for '~s'~n", [Username]), change_password_hash(Username, <<"">>). -change_password_hash(Username, PasswordHash) -> - R = update_user(Username, fun(User) -> - User#internal_user{ - password_hash = PasswordHash } - end), - R. - hash_password(Cleartext) -> - Salt = make_salt(), - Hash = salted_md5(Salt, Cleartext), - <<Salt/binary, Hash/binary>>. - -check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> - Hash =:= salted_md5(Salt, Cleartext); -check_password(_Cleartext, _Any) -> - false. - -make_salt() -> {A1,A2,A3} = now(), random:seed(A1, A2, A3), Salt = random:uniform(16#ffffffff), - <<Salt:32>>. + SaltBin = <<Salt:32>>, + Hash = salted_md5(SaltBin, Cleartext), + <<SaltBin/binary, Hash/binary>>. + +change_password_hash(Username, PasswordHash) -> + update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash } + end). salted_md5(Salt, Cleartext) -> Salted = <<Salt/binary, Cleartext/binary>>, erlang:md5(Salted). set_tags(Username, Tags) -> - rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]), - R = update_user(Username, fun(User) -> - User#internal_user{tags = Tags} - end), - R. - -update_user(Username, Fun) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - {ok, User} = lookup_user(Username), - ok = mnesia:write(rabbit_user, Fun(User), write) - end)). - -list_users() -> - [[{user, Username}, {tags, Tags}] || - #internal_user{username = Username, tags = Tags} <- - mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. - -user_info_keys() -> ?USER_INFO_KEYS. - -lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). - -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case re:compile(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) - end. + rabbit_log:info("Setting user tags for user '~s' to ~p~n", + [Username, Tags]), + update_user(Username, fun(User) -> + User#internal_user{tags = Tags} + end). set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n", + rabbit_log:info("Setting permissions for " + "'~s' in '~s' to '~s', '~s', '~s'~n", [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]), - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + lists:map( + fun (RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, + Regexp, Reason}}) + end + end, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -269,7 +244,6 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> write) end)). - clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( @@ -280,32 +254,36 @@ clear_permissions(Username, VHostPath) -> virtual_host = VHostPath}}) end)). +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +%%---------------------------------------------------------------------------- +%% Listing + +-define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). + +user_info_keys() -> ?USER_INFO_KEYS. + perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS]. vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS]. user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS]. user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS. +list_users() -> + [[{user, Username}, {tags, Tags}] || + #internal_user{username = Username, tags = Tags} <- + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + list_permissions() -> list_permissions(perms_info_keys(), match_user_vhost('_', '_')). -list_vhost_permissions(VHostPath) -> - list_permissions( - vhost_perms_info_keys(), - rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). - -list_user_permissions(Username) -> - list_permissions( - user_perms_info_keys(), - rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). - -list_user_vhost_permissions(Username, VHostPath) -> - list_permissions( - user_vhost_perms_info_keys(), - rabbit_misc:with_user_and_vhost( - Username, VHostPath, match_user_vhost(Username, VHostPath))). - -filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. - list_permissions(Keys, QueryThunk) -> [filter_props(Keys, [{user, Username}, {vhost, VHostPath}, @@ -320,6 +298,24 @@ list_permissions(Keys, QueryThunk) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. +filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. + +list_user_permissions(Username) -> + list_permissions( + user_perms_info_keys(), + rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). + +list_vhost_permissions(VHostPath) -> + list_permissions( + vhost_perms_info_keys(), + rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). + +list_user_vhost_permissions(Username, VHostPath) -> + list_permissions( + user_vhost_perms_info_keys(), + rabbit_misc:with_user_and_vhost( + Username, VHostPath, match_user_vhost(Username, VHostPath))). + match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 11e6bd38..bb9c61a8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -169,7 +169,7 @@ add(Binding, InnerFun) -> ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/0 + [_] -> fun () -> ok end end; {error, _} = Err -> rabbit_misc:const(Err) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8157e82d..7907c96c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -497,15 +497,14 @@ check_user_id_header(#'P_basic'{user_id = undefined}, _) -> check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; +check_user_id_header( + #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> + ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> - case lists:member(impersonator, Tags) of - true -> ok; - false -> precondition_failed( - "user_id property set to '~s' but authenticated user was " - "'~s'", [Claimed, Actual]) - end. + #ch{user = #user{username = Actual}}) -> + precondition_failed( + "user_id property set to '~s' but authenticated user was '~s'", + [Claimed, Actual]). check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of @@ -1440,8 +1439,9 @@ notify_limiter(Limiter, Acked) -> end end. -deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - mandatory = false}, +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + confirm = false, + mandatory = false}, []}, State) -> %% optimisation ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), State; diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 2bd22579..49f7e388 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -51,8 +51,11 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- -intercept_method(#'basic.publish'{} = M, _VHost) -> - M; +intercept_method(#'basic.publish'{} = M, _VHost) -> M; +intercept_method(#'basic.ack'{} = M, _VHost) -> M; +intercept_method(#'basic.nack'{} = M, _VHost) -> M; +intercept_method(#'basic.reject'{} = M, _VHost) -> M; +intercept_method(#'basic.credit'{} = M, _VHost) -> M; intercept_method(M, VHost) -> intercept_method(M, VHost, select(rabbit_misc:method_record_type(M))). @@ -87,5 +90,7 @@ select(Method) -> validate_method(M, M2) -> rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). +%% keep dialyzer happy +-spec internal_error(string(), [any()]) -> no_return(). internal_error(Format, Args) -> - rabbit_misc:protocol_error(internal_error, Format, Args).
\ No newline at end of file + rabbit_misc:protocol_error(internal_error, Format, Args). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f3463286..746f2bdb 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -90,6 +90,7 @@ status, environment, report, + set_cluster_name, eval, close_connection, @@ -527,6 +528,10 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], ok; +action(set_cluster_name, Node, [Name], _Opts, Inform) -> + Inform("Setting cluster name to ~s", [Name]), + rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]); + action(eval, Node, [Expr], _Opts, _Inform) -> case erl_scan:string(Expr) of {ok, Scanned, _} -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index e6e4b4cc..c372d5f1 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -31,7 +31,7 @@ -spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() | +-spec(connect/5 :: (('nouser' | {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> @@ -67,37 +67,35 @@ list() -> %%---------------------------------------------------------------------------- -connect(User = #user{}, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_event:notify(connection_created, Infos), - {ok, {User, rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = access_refused} -> - {error, access_refused} - end; - connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(fun () -> rabbit_access_control:check_user_pass_login( Username, Password) end, VHost, Protocol, Pid, Infos); -connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login( - Username, []) end, +connect(nouser, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, VHost, Protocol, Pid, Infos). connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> case AuthFun() of {ok, User} -> - connect(User, VHost, Protocol, Pid, Infos); + connect1(User, VHost, Protocol, Pid, Infos); {refused, _M, _A} -> {error, {auth_failure, "Refused"}} end; false -> {error, broker_not_found_on_node} end. +connect1(User, VHost, Protocol, Pid, Infos) -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end. start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8ba29deb..27b8d1e6 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -79,9 +79,9 @@ remove_bindings(transaction, _X, Bs) -> [begin Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), - trie_remove_binding(X, FinalNode, D), + trie_remove_binding(X, FinalNode, D, Args), remove_path_if_empty(X, Path) - end || #binding{source = X, key = K, destination = D} <- Bs], + end || #binding{source = X, key = K, destination = D, args = Args} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. @@ -91,9 +91,10 @@ assert_args_equivalence(X, Args) -> %%---------------------------------------------------------------------------- -internal_add_binding(#binding{source = X, key = K, destination = D}) -> +internal_add_binding(#binding{source = X, key = K, destination = D, + args = Args}) -> FinalNode = follow_down_create(X, split_topic_key(K)), - trie_add_binding(X, FinalNode, D), + trie_add_binding(X, FinalNode, D, Args), ok. trie_match(X, Words) -> @@ -176,7 +177,8 @@ trie_bindings(X, Node) -> MatchHead = #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = '$1'}}, + destination = '$1', + arguments = '_'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). trie_update_node_counts(X, Node, Field, Delta) -> @@ -213,20 +215,21 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> node_id = ToNode}, write). -trie_add_binding(X, Node, D) -> +trie_add_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), - trie_binding_op(X, Node, D, fun mnesia:write/3). + trie_binding_op(X, Node, D, Args, fun mnesia:write/3). -trie_remove_binding(X, Node, D) -> +trie_remove_binding(X, Node, D, Args) -> trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), - trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3). -trie_binding_op(X, Node, D, Op) -> +trie_binding_op(X, Node, D, Args, Op) -> ok = Op(rabbit_topic_trie_binding, #topic_trie_binding{ trie_binding = #trie_binding{exchange_name = X, node_id = Node, - destination = D}}, + destination = D, + arguments = Args}}, write). trie_remove_all_nodes(X) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ca495733..4f77009c 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -148,53 +148,54 @@ drop_mirrors(QName, Nodes) -> ok. drop_mirror(QName, MirrorNode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid] when SPids =:= [] -> - {error, cannot_drop_only_mirror}; - [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), - exit(Pid, {shutdown, dropped}), - {ok, dropped} - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + {ok, dropped} + end; + {error, not_found} = E -> + E + end. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q, SyncMode); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q, SyncMode) + end + end; + {error, not_found} = E -> + E + end. start_child(Name, MirrorNode, Q, SyncMode) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(down), - fun () -> - rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) - end) of - {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode); - _ -> ok - end. + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 1c63980e..848c4a87 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -53,13 +53,12 @@ -export([parse_arguments/3]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). --export([const_ok/0, const/1]). +-export([const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2, pset/3]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). --export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). -export([version/0, which_applications/0]). @@ -219,7 +218,6 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). --spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -230,8 +228,6 @@ -spec(pset/3 :: (term(), term(), [term()]) -> term()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). --spec(multi_call/2 :: - ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(version/0 :: () -> string()). @@ -891,7 +887,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> {error, Reason} end. -const_ok() -> ok. const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see @@ -950,31 +945,6 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). -%% A simplified version of gen_server:multi_call/2 with a sane -%% API. This is not in gen_server2 as there is no useful -%% infrastructure there to share. -multi_call(Pids, Req) -> - MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], - receive_multi_call(MonitorPids, [], []). - -start_multi_call(Pid, Req) when is_pid(Pid) -> - Mref = erlang:monitor(process, Pid), - Pid ! {'$gen_call', {self(), Mref}, Req}, - {Mref, Pid}. - -receive_multi_call([], Good, Bad) -> - {lists:reverse(Good), lists:reverse(Bad)}; -receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> - receive - {Mref, Reply} -> - erlang:demonitor(Mref, [flush]), - receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); - {'DOWN', Mref, _, _, noconnection} -> - receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); - {'DOWN', Mref, _, _, Reason} -> - receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) - end. - os_cmd(Command) -> case os:type() of {win32, _} -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f27f77c6..59873ffc 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -327,6 +327,7 @@ status() -> case is_running() of true -> RunningNodes = cluster_nodes(running), [{running_nodes, RunningNodes}, + {cluster_name, rabbit_nodes:cluster_name()}, {partitions, mnesia_partitions(RunningNodes)}]; false -> [] end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 5a1613a7..c5aa8473 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,8 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2, fqdn_nodename/0]). + is_running/2, is_process_running/2, + cluster_name/0, set_cluster_name/1]). -include_lib("kernel/include/inet.hrl"). @@ -37,7 +38,8 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). --spec(fqdn_nodename/0 :: () -> binary()). +-spec(cluster_name/0 :: () -> binary()). +-spec(set_cluster_name/1 :: (binary()) -> 'ok'). -endif. @@ -111,8 +113,15 @@ is_process_running(Node, Process) -> P when is_pid(P) -> true end. -fqdn_nodename() -> +cluster_name() -> + rabbit_runtime_parameters:value_global( + cluster_name, cluster_name_default()). + +cluster_name_default() -> {ID, _} = rabbit_nodes:parts(node()), {ok, Host} = inet:gethostname(), {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + +set_cluster_name(Name) -> + rabbit_runtime_parameters:set_global(cluster_name, Name). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 833c6e27..9ffcd203 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -156,19 +156,23 @@ server_properties(Protocol) -> [case X of {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)), longstr, - list_to_binary(Value)}; + maybe_list_to_binary(Value)}; {BinKey, Type, Value} -> {BinKey, Type, Value} end || X <- RawConfigServerProps ++ - [{product, Product}, - {version, Version}, - {platform, "Erlang/OTP"}, - {copyright, ?COPYRIGHT_MESSAGE}, - {information, ?INFORMATION_MESSAGE}]]], + [{product, Product}, + {version, Version}, + {cluster_name, rabbit_nodes:cluster_name()}, + {platform, "Erlang/OTP"}, + {copyright, ?COPYRIGHT_MESSAGE}, + {information, ?INFORMATION_MESSAGE}]]], %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). +maybe_list_to_binary(V) when is_binary(V) -> V; +maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V). + server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, @@ -958,6 +962,9 @@ validate_negotiated_integer_value(Field, Min, ClientValue) -> ok end. +%% keep dialyzer happy +-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) -> + no_return(). fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> {S1, S2} = case MinOrMax of min -> {lower, minimum}; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index bcde0078..18b9fbb8 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -22,6 +22,8 @@ list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). +-export([set_global/2, value_global/1, value_global/2]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -34,6 +36,7 @@ -> ok_or_error_string()). -spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) @@ -48,6 +51,8 @@ -> rabbit_types:infos() | 'not_found'). -spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). -spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). +-spec(value_global/1 :: (atom()) -> term() | 'not_found'). +-spec(value_global/2 :: (atom(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -endif. @@ -74,6 +79,10 @@ set(_, <<"policy">>, _, _) -> set(VHost, Component, Name, Term) -> set_any(VHost, Component, Name, Term). +set_global(Name, Term) -> + mnesia_update(Name, Term), + ok. + format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. @@ -100,16 +109,22 @@ set_any0(VHost, Component, Name, Term) -> E end. +mnesia_update(Key, Term) -> + rabbit_misc:execute_mnesia_transaction(mnesia_update_fun(Key, Term)). + mnesia_update(VHost, Comp, Name, Term) -> - F = fun () -> - Res = case mnesia:read(?TABLE, {VHost, Comp, Name}, read) of - [] -> new; - [Params] -> {old, Params#runtime_parameters.value} + rabbit_misc:execute_mnesia_transaction( + rabbit_vhost:with(VHost, mnesia_update_fun({VHost, Comp, Name}, Term))). + +mnesia_update_fun(Key, Term) -> + fun () -> + Res = case mnesia:read(?TABLE, Key, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Comp, Name, Term), write), - Res - end, - rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)). + ok = mnesia:write(?TABLE, c(Key, Term), write), + Res + end. clear(_, <<"policy">> , _) -> {error_string, "policies may not be cleared using this method"}; @@ -159,43 +174,46 @@ list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. lookup(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of + case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Name) -> - case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of +value(VHost, Comp, Name) -> value0({VHost, Comp, Name}). +value(VHost, Comp, Name, Def) -> value0({VHost, Comp, Name}, Def). + +value_global(Key) -> value0(Key). +value_global(Key, Default) -> value0(Key, Default). + +value0(Key) -> + case lookup0(Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Name, Default) -> - Params = lookup0(VHost, Component, Name, - fun () -> - lookup_missing(VHost, Component, Name, Default) - end), +value0(Key, Default) -> + Params = lookup0(Key, fun () -> lookup_missing(Key, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Name, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of +lookup0(Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, Key) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Name, Default) -> +lookup_missing(Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Name}, read) of - [] -> Record = c(VHost, Component, Name, Default), + case mnesia:read(?TABLE, Key, read) of + [] -> Record = c(Key, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Name, Default) -> - #runtime_parameters{key = {VHost, Component, Name}, +c(Key, Default) -> + #runtime_parameters{key = Key, value = Default}. p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2d6ff73b..33c6354b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -39,7 +39,6 @@ all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), passed = test_version_equivalance(), - passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_rabbit_basic_header_handling(), @@ -66,6 +65,7 @@ all_tests() -> passed = test_amqp_connection_refusal(), passed = test_confirms(), passed = test_with_state(), + passed = test_mcall(), passed = do_if_secondary_node( fun run_cluster_dependent_tests/1, @@ -156,26 +156,6 @@ test_version_equivalance() -> false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), passed. -test_multi_call() -> - Fun = fun() -> - receive - {'$gen_call', {From, Mref}, request} -> - From ! {Mref, response} - end, - receive - never -> ok - end - end, - Pid1 = spawn(Fun), - Pid2 = spawn(Fun), - Pid3 = spawn(Fun), - exit(Pid2, bang), - {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} = - rabbit_misc:multi_call([Pid1, Pid2, Pid3], request), - exit(Pid1, bang), - exit(Pid3, bang), - passed. - test_rabbit_basic_header_handling() -> passed = write_table_with_invalid_existing_type_test(), passed = invalid_existing_headers_test(), @@ -578,33 +558,38 @@ test_topic_matching() -> key = list_to_binary(Key), destination = #resource{virtual_host = <<"/">>, kind = queue, - name = list_to_binary(Q)}} || - {Key, Q} <- [{"a.b.c", "t1"}, - {"a.*.c", "t2"}, - {"a.#.b", "t3"}, - {"a.b.b.c", "t4"}, - {"#", "t5"}, - {"#.#", "t6"}, - {"#.b", "t7"}, - {"*.*", "t8"}, - {"a.*", "t9"}, - {"*.b.c", "t10"}, - {"a.#", "t11"}, - {"a.#.#", "t12"}, - {"b.b.c", "t13"}, - {"a.b.b", "t14"}, - {"a.b", "t15"}, - {"b.c", "t16"}, - {"", "t17"}, - {"*.*.*", "t18"}, - {"vodka.martini", "t19"}, - {"a.b.c", "t20"}, - {"*.#", "t21"}, - {"#.*.#", "t22"}, - {"*.#.#", "t23"}, - {"#.#.#", "t24"}, - {"*", "t25"}, - {"#.b.#", "t26"}]], + name = list_to_binary(Q)}, + args = Args} || + {Key, Q, Args} <- [{"a.b.c", "t1", []}, + {"a.*.c", "t2", []}, + {"a.#.b", "t3", []}, + {"a.b.b.c", "t4", []}, + {"#", "t5", []}, + {"#.#", "t6", []}, + {"#.b", "t7", []}, + {"*.*", "t8", []}, + {"a.*", "t9", []}, + {"*.b.c", "t10", []}, + {"a.#", "t11", []}, + {"a.#.#", "t12", []}, + {"b.b.c", "t13", []}, + {"a.b.b", "t14", []}, + {"a.b", "t15", []}, + {"b.c", "t16", []}, + {"", "t17", []}, + {"*.*.*", "t18", []}, + {"vodka.martini", "t19", []}, + {"a.b.c", "t20", []}, + {"*.#", "t21", []}, + {"#.*.#", "t22", []}, + {"*.#.#", "t23", []}, + {"#.#.#", "t24", []}, + {"*", "t25", []}, + {"#.b.#", "t26", []}, + {"args-test", "t27", + [{<<"foo">>, longstr, <<"bar">>}]}, + {"args-test", "t27", %% Note aliasing + [{<<"foo">>, longstr, <<"baz">>}]}]], lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, Bindings), @@ -631,12 +616,13 @@ test_topic_matching() -> "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", - "t25"]}]), - + "t25"]}, + {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25", "t27"]}]), %% remove some bindings RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), lists:nth(11, Bindings), lists:nth(19, Bindings), - lists:nth(21, Bindings)], + lists:nth(21, Bindings), lists:nth(28, Bindings)], exchange_op_callback(X, remove_bindings, [RemovedBindings]), RemainingBindings = ordsets:to_list( ordsets:subtract(ordsets:from_list(Bindings), @@ -659,7 +645,8 @@ test_topic_matching() -> {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", "t24", "t26"]}, {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}, + {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]), %% remove the entire exchange exchange_op_callback(X, delete, [RemainingBindings]), @@ -1042,6 +1029,9 @@ test_user_management() -> ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), + {new, _} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/testhost">>, queue, <<"test">>), + true, false, [], none), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -1368,6 +1358,82 @@ test_with_state() -> fun (S) -> element(1, S) end), passed. +test_mcall() -> + P1 = spawn(fun gs2_test_listener/0), + register(foo, P1), + global:register_name(gfoo, P1), + + P2 = spawn(fun() -> exit(bang) end), + %% ensure P2 is dead (ignore the race setting up the monitor) + await_exit(P2), + + P3 = spawn(fun gs2_test_crasher/0), + + %% since P2 crashes almost immediately and P3 after receiving its first + %% message, we have to spawn a few more processes to handle the additional + %% cases we're interested in here + register(baz, spawn(fun gs2_test_crasher/0)), + register(bog, spawn(fun gs2_test_crasher/0)), + global:register_name(gbaz, spawn(fun gs2_test_crasher/0)), + + NoNode = rabbit_nodes:make("nonode"), + + Targets = + %% pids + [P1, P2, P3] + ++ + %% registered names + [foo, bar, baz] + ++ + %% {Name, Node} pairs + [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}] + ++ + %% {global, Name} + [{global, gfoo}, {global, gbar}, {global, gbaz}], + + GoodResults = [{D, goodbye} || D <- [P1, foo, + {foo, node()}, + {global, gfoo}]], + + BadResults = [{P2, noproc}, % died before use + {P3, boom}, % died on first use + {bar, noproc}, % never registered + {baz, boom}, % died on first use + {{bar, node()}, noproc}, % never registered + {{bog, node()}, boom}, % died on first use + {{foo, NoNode}, nodedown}, % invalid node + {{global, gbar}, noproc}, % never registered globally + {{global, gbaz}, boom}], % died on first use + + {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]), + true = lists:sort(Replies) == lists:sort(GoodResults), + true = lists:sort(Errors) == lists:sort(BadResults), + + %% cleanup (ignore the race setting up the monitor) + P1 ! stop, + await_exit(P1), + passed. + +await_exit(Pid) -> + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, _, _, _} -> ok + end. + +gs2_test_crasher() -> + receive + {'$gen_call', _From, hello} -> exit(boom) + end. + +gs2_test_listener() -> + receive + {'$gen_call', From, hello} -> + gen_server2:reply(From, goodbye), + gs2_test_listener(); + stop -> + ok + end. + test_statistics_event_receiver(Pid) -> receive Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 90372461..4cb3cacc 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -47,6 +47,7 @@ -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). +-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). %% ------------------------------------------------------------------- @@ -355,6 +356,32 @@ internal_system_x() -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +cluster_name() -> + {atomic, ok} = mnesia:transaction(fun cluster_name_tx/0), + ok. + +cluster_name_tx() -> + %% mnesia:transform_table/4 does not let us delete records + T = rabbit_runtime_parameters, + mnesia:write_lock_table(T), + Ks = [K || {_VHost, <<"federation">>, <<"local-nodename">>} = K + <- mnesia:all_keys(T)], + case Ks of + [] -> ok; + [K|Tl] -> [{runtime_parameters, _K, Name}] = mnesia:read(T, K, write), + R = {runtime_parameters, cluster_name, Name}, + mnesia:write(T, R, write), + case Tl of + [] -> ok; + _ -> {VHost, _, _} = K, + error_logger:warning_msg( + "Multiple local-nodenames found, picking '~s' " + "from '~s' for cluster name~n", [Name, VHost]) + end + end, + [mnesia:delete(T, K, write) || K <- Ks], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 020b5b33..321af4ac 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -643,6 +643,31 @@ drop(AckRequired, State) -> ack([], State) -> {[], State}; +%% optimisation: this head is essentially a partial evaluation of the +%% general case below, for the single-ack case. +ack([SeqId], State) -> + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + remove_pending_ack(SeqId, State), + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgOnDisk of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + PCount1 = PCount - one_if(IsPersistent), + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -730,20 +755,20 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). -update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, +update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount, + ack_in_counter = AckInCount, ack_out_counter = AckOutCount, - rates = #rates{ in = InRate, - out = OutRate, - ack_in = AckInRate, + rates = #rates{ in = InRate, + out = OutRate, + ack_in = AckInRate, ack_out = AckOutRate, timestamp = TS }}) -> Now = erlang:now(), - Rates = #rates { in = update_rate(Now, TS, InCount, InRate), - out = update_rate(Now, TS, OutCount, OutRate), - ack_in = update_rate(Now, TS, AckInCount, AckInRate), + Rates = #rates { in = update_rate(Now, TS, InCount, InRate), + out = update_rate(Now, TS, OutCount, OutRate), + ack_in = update_rate(Now, TS, AckInCount, AckInRate), ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), timestamp = Now }, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 047bce77..9fa4da44 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -83,9 +83,9 @@ delete(VHostPath) -> %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), - [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + [assert_benign(rabbit_amqqueue:delete(Q, false, false)) || Q <- rabbit_amqqueue:list(VHostPath)], - [ok = rabbit_exchange:delete(Name, false) || + [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> @@ -94,6 +94,18 @@ delete(VHostPath) -> ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q}}) -> + %% We have a durable queue on a down node. Removing the mnesia + %% entries here is safe. If/when the down node restarts, it will + %% clear out the on-disk storage of the queue. + case rabbit_amqqueue:internal_delete(Q#amqqueue.name) of + ok -> ok; + {error, not_found} -> ok + end. + internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) |