diff options
58 files changed, 2439 insertions, 1360 deletions
@@ -170,7 +170,7 @@ start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server ; sleep 1 + ./scripts/rabbitmq-server; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6b02abe4..2152cab3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -398,7 +398,12 @@ <refsect2> <title>User management</title> - + <para> + Note that <command>rabbitmqctl</command> manages the RabbitMQ + internal user database. Users from any alternative + authentication backend will not be visible + to <command>rabbitmqctl</command>. + </para> <variablelist> <varlistentry> <term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term> @@ -466,6 +471,25 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose password is to be cleared.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_password tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to clear the + password for the user named + <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured). + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> @@ -525,7 +549,12 @@ <refsect2> <title>Access control</title> - + <para> + Note that <command>rabbitmqctl</command> manages the RabbitMQ + internal user database. Permissions for users from any + alternative authorisation backend will not be visible + to <command>rabbitmqctl</command>. + </para> <variablelist> <varlistentry> <term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> @@ -847,6 +876,10 @@ <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem> </varlistentry> <varlistentry> + <term>internal</term> + <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem> + </varlistentry> + <varlistentry> <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> @@ -977,6 +1010,26 @@ connection is secured with SSL.</para></listitem> </varlistentry> <varlistentry> + <term>ssl_protocol</term> + <listitem><para>SSL protocol + (e.g. tlsv1)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_key_exchange</term> + <listitem><para>SSL key exchange algorithm + (e.g. rsa)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_cipher</term> + <listitem><para>SSL cipher algorithm + (e.g. aes_256_cbc)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_hash</term> + <listitem><para>SSL hash function + (e.g. sha)</para></listitem> + </varlistentry> + <varlistentry> <term>peer_cert_subject</term> <listitem><para>The subject of the peer's SSL certificate, in RFC4514 form.</para></listitem> @@ -1005,6 +1058,10 @@ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> </varlistentry> <varlistentry> + <term>auth_mechanism</term> + <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> @@ -1054,7 +1111,7 @@ <para role="example-prefix"> For example: </para> - <screen role="example">rabbitmqctl list_connections send_pend server_port</screen> + <screen role="example">rabbitmqctl list_connections send_pend port</screen> <para role="example"> This command displays the send queue size and server port for each connection. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d3808a54..1f25338d 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -31,4 +31,7 @@ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {cluster_nodes, []}, {server_properties, []}, - {collect_statistics, none}]}]}. + {collect_statistics, none}, + {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, + {auth_backends, [rabbit_auth_backend_internal]}, + {delegate_count, 16}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index fccfad97..81c3996b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -29,7 +29,13 @@ %% Contributor(s): ______________________________________. %% --record(user, {username, password_hash, is_admin}). +-record(user, {username, + is_admin, + auth_backend, %% Module this user came from + impl %% Scratch space for that module + }). + +-record(internal_user, {username, password_hash, is_admin}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). @@ -51,7 +57,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, auto_delete, internal, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl new file mode 100644 index 00000000..a96c18d8 --- /dev/null +++ b/include/rabbit_auth_backend_spec.hrl @@ -0,0 +1,46 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). + +-spec(check_user_login/2 :: (rabbit_types:username(), [term()]) -> + {'ok', rabbit_types:user()} | + {'refused', string(), [any()]} | + {'error', any()}). +-spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(), + rabbit_access_control:vhost_permission_atom()) -> + boolean() | {'error', any()}). +-spec(check_resource_access/3 :: (rabbit_types:user(), + rabbit_types:r(atom()), + rabbit_access_control:permission_atom()) -> + boolean() | {'error', any()}). +-endif. diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl new file mode 100644 index 00000000..f8dc93fe --- /dev/null +++ b/include/rabbit_auth_mechanism_spec.hrl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). +-spec(init/1 :: (rabbit_net:socket()) -> any()). +-spec(handle_response/2 :: (binary(), any()) -> + {'ok', rabbit_types:user()} | + {'challenge', binary(), any()} | + {'protocol_error', string(), [any()]} | + {'refused', string(), [any()]}). + +-endif. diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f67c6f46..6fa34ccc 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -58,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 36734874..8cb470d0 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -48,6 +48,8 @@ done SCRIPT_DIR=`dirname $SCRIPT_PATH` RABBITMQ_HOME="${SCRIPT_DIR}/.." +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` +NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 59050692..33883702 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,8 +29,7 @@ ## ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} + SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a4f8c8b4..ec61dc99 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( -pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 66ce4384..4155b31d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,8 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" @@ -92,6 +90,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then -noinput \ -hidden \ -s rabbit_prelaunch \ + -sname rabbitmqprelaunch$$ \ -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" then RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 872c87e3..ec5b4d85 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -118,6 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 76ce25fd..56cff891 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,9 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} - . `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 563b9e58..4ffde73f 100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b..10054e57 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -31,11 +31,9 @@ -module(delegate). --define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). - -behaviour(gen_server2). --export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,13 +42,16 @@ -ifdef(use_specs). --spec(start_link/2 :: - (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). +-spec(invoke/2 :: + ( pid(), fun ((pid()) -> A)) -> A; + ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], + [{pid(), term()}]}). --spec(process_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/0 :: () -> non_neg_integer()). -endif. @@ -61,157 +62,113 @@ %%---------------------------------------------------------------------------- -start_link(Prefix, Hash) -> - gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). +start_link(Num) -> + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). +invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + Fun(Pid); invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), - case Res of - {ok, Result, _} -> + case invoke([Pid], Fun) of + {[{Pid, Result}], []} -> Result; - {error, {Class, Reason, StackTrace}, _} -> + {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; invoke(Pids, Fun) when is_list(Pids) -> - lists:foldl( - fun ({Status, Result, Pid}, {Good, Bad}) -> - case Status of - ok -> {[{Pid, Result}|Good], Bad}; - error -> {Good, [{Pid, Result}|Bad]} - end + {LocalPids, Grouped} = group_pids_by_node(Pids), + %% The use of multi_call is only safe because the timeout is + %% infinity, and thus there is no process spawned in order to do + %% the sending. Thus calls can't overtake preceding calls/casts. + {Replies, BadNodes} = + case orddict:fetch_keys(Grouped) of + [] -> {[], []}; + RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), + {invoke, Fun, Grouped}, + infinity) end, - {[], []}, - invoke_per_node(split_delegate_per_node(Pids), Fun)). + BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || + BadNode <- BadNodes, + Pid <- orddict:fetch(BadNode, Grouped)], + ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + [Results || {_Node, Results} <- Replies]]), + lists:foldl( + fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; + ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} + end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), +invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, Fun), %% we don't care about any error ok; +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result([Pid], Fun); invoke_no_result(Pids, Fun) when is_list(Pids) -> - invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + {LocalPids, Grouped} = group_pids_by_node(Pids), + case orddict:fetch_keys(Grouped) of + [] -> ok; + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + {invoke, Fun, Grouped}) + end, + safe_invoke(LocalPids, Fun), %% must not die ok. %%---------------------------------------------------------------------------- -internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). - -internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). - -split_delegate_per_node(Pids) -> +group_pids_by_node(Pids) -> LocalNode = node(), - {Local, Remote} = - lists:foldl( - fun (Pid, {L, D}) -> - Node = node(Pid), - case Node of - LocalNode -> {[Pid|L], D}; - _ -> {L, orddict:append(Node, Pid, D)} - end - end, - {[], orddict:new()}, Pids), - {Local, orddict:to_list(Remote)}. - -invoke_per_node(NodePids, Fun) -> - lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). - -invoke_no_result_per_node(NodePids, Fun) -> - delegate_per_node(NodePids, Fun, fun internal_cast/2), - ok. - -delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> - %% In the case where DelegateFun is internal_cast, the safe_invoke - %% is not actually async! However, in practice Fun will always be - %% something that does a gen_server:cast or similar, so I don't - %% think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - [safe_invoke(LocalPids, Fun)| - delegate_per_remote_node(NodePids, Fun, DelegateFun)]. - -delegate_per_remote_node(NodePids, Fun, DelegateFun) -> - Self = self(), - %% Note that this is unsafe if the Fun requires reentrancy to the - %% local_server. I.e. if self() == local_server(Node) then we'll - %% block forever. - [gen_server2:cast( - local_server(Node), - {thunk, fun () -> - Self ! {result, - DelegateFun( - Node, fun () -> safe_invoke(Pids, Fun) end)} - end}) || {Node, Pids} <- NodePids], - [receive {result, Result} -> Result end || _ <- NodePids]. - -local_server(Node) -> - case get({delegate_local_server_name, Node}) of - undefined -> - Name = server(outgoing, - erlang:phash2({self(), Node}, process_count())), - put({delegate_local_server_name, Node}, Name), - Name; - Name -> Name - end. - -remote_server(Node) -> - case get({delegate_remote_server_name, Node}) of - undefined -> - case rpc:call(Node, delegate, process_count, []) of - {badrpc, _} -> - %% Have to return something, if we're just casting - %% then we don't want to blow up - server(incoming, 1); - Count -> - Name = server(incoming, - erlang:phash2({self(), Node}, Count)), - put({delegate_remote_server_name, Node}, Name), - Name - end; - Name -> Name + lists:foldl( + fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> + {[Pid | Local], Remote}; + (Pid, {Local, Remote}) -> + {Local, + orddict:update( + node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} + end, {[], orddict:new()}, Pids). + +delegate_count() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + Count. + +delegate_name(Hash) -> + list_to_atom("delegate_" ++ integer_to_list(Hash)). + +delegate() -> + case get(delegate) of + undefined -> Name = delegate_name( + erlang:phash2(self(), delegate_count())), + put(delegate, Name), + Name; + Name -> Name end. -server(Prefix, Hash) -> - list_to_atom("delegate_" ++ - atom_to_list(Prefix) ++ "_" ++ - integer_to_list(Hash)). - safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; safe_invoke(Pid, Fun) when is_pid(Pid) -> try - {ok, Fun(Pid), Pid} - catch - Class:Reason -> - {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + {ok, Pid, Fun(Pid)} + catch Class:Reason -> + {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. -process_count() -> - ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). - -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, no_state, hibernate, + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -%% We don't need a catch here; we always go via safe_invoke. A catch here would -%% be the wrong thing anyway since the Thunk can throw multiple errors. -handle_call({thunk, Thunk}, _From, State) -> - {reply, Thunk(), State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_cast({thunk, Thunk}, State) -> - Thunk(), - {noreply, State, hibernate}. +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 544546f1..d2af72af 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,11 +55,8 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. - -specs(Prefix) -> - [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]. - -%%---------------------------------------------------------------------------- + DCount = delegate:delegate_count(), + {ok, {{one_for_one, 10, 10}, + [{Num, {delegate, start_link, [Num]}, + transient, 16#ffffffff, worker, [delegate]} || + Num <- lists:seq(0, DCount - 1)]}}. diff --git a/src/rabbit.erl b/src/rabbit.erl index ace8f286..954e289b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -69,10 +69,10 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_exchange_type_registry, - [{description, "exchange type registry"}, +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, {mfa, {rabbit_sup, start_child, - [rabbit_exchange_type_registry]}}, + [rabbit_registry]}}, {requires, external_infrastructure}, {enables, kernel_ready}]}). @@ -458,16 +458,16 @@ insert_default_data() -> {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), - ok = rabbit_access_control:add_vhost(DefaultVHost), - ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + ok = rabbit_vhost:add(DefaultVHost), + ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), case DefaultAdmin of - true -> rabbit_access_control:set_admin(DefaultUser); + true -> rabbit_auth_backend_internal:set_admin(DefaultUser); _ -> ok end, - ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), + ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), ok. rotate_logs(File, Suffix, Handler) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index bc588013..02a65442 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -30,419 +30,123 @@ %% -module(rabbit_access_control). --include_lib("stdlib/include/qlc.hrl"). + -include("rabbit.hrl"). --export([check_login/2, user_pass_login/2, check_user_pass_login/2, - check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1]). --export([change_password_hash/2, hash_password/1]). --export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, clear_permissions/2, - list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, - list_user_vhost_permissions/2]). +-export([user_pass_login/2, check_user_pass_login/2, check_user_login/2, + check_vhost_access/2, check_resource_access/3, list_vhosts/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([username/0, password/0, password_hash/0]). +-export_type([permission_atom/0, vhost_permission_atom/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). --type(username() :: binary()). --type(password() :: binary()). --type(password_hash() :: binary()). --type(regexp() :: binary()). --spec(check_login/2 :: - (binary(), binary()) -> rabbit_types:user() | - rabbit_types:channel_exit()). +-type(vhost_permission_atom() :: 'read' | 'write'). + -spec(user_pass_login/2 :: - (username(), password()) + (rabbit_types:username(), rabbit_types:password()) -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: - (username(), password()) - -> {'ok', rabbit_types:user()} | 'refused'). + (rabbit_types:username(), rabbit_types:password()) + -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). -spec(check_resource_access/3 :: - (username(), rabbit_types:r(atom()), permission_atom()) + (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> 'ok' | rabbit_types:channel_exit()). --spec(add_user/2 :: (username(), password()) -> 'ok'). --spec(delete_user/1 :: (username()) -> 'ok'). --spec(change_password/2 :: (username(), password()) -> 'ok'). --spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). --spec(hash_password/1 :: (password()) -> password_hash()). --spec(set_admin/1 :: (username()) -> 'ok'). --spec(clear_admin/1 :: (username()) -> 'ok'). --spec(list_users/0 :: () -> [{username(), boolean()}]). --spec(lookup_user/1 :: - (username()) -> rabbit_types:ok(rabbit_types:user()) - | rabbit_types:error('not_found')). --spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). --spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). --spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). --spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), - regexp(), regexp()) -> 'ok'). --spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). --spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). --spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). --spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). --spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). +-spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom()) + -> [rabbit_types:vhost()]). -endif. %%---------------------------------------------------------------------------- -%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, -%% apparently, by OpenAMQ. -check_login(<<"PLAIN">>, Response) -> - [User, Pass] = [list_to_binary(T) || - T <- string:tokens(binary_to_list(Response), [0])], - user_pass_login(User, Pass); -%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually -%% defines this as PLAIN, but in 0-9 that definition is gone, instead -%% referring generically to "SASL security mechanism", i.e. the above. -check_login(<<"AMQPLAIN">>, Response) -> - LoginTable = rabbit_binary_parser:parse_table(Response), - case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), - lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of - {{value, {_, longstr, User}}, - {value, {_, longstr, Pass}}} -> - user_pass_login(User, Pass); - _ -> - %% Is this an information leak? - rabbit_misc:protocol_error( - access_refused, - "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field", - [LoginTable]) - end; - -check_login(Mechanism, _Response) -> - rabbit_misc:protocol_error( - access_refused, "unsupported authentication mechanism '~s'", - [Mechanism]). - user_pass_login(User, Pass) -> ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), case check_user_pass_login(User, Pass) of - refused -> + {refused, Msg, Args} -> rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]); + access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]); {ok, U} -> U end. -check_user_pass_login(User, Pass) -> - case lookup_user(User) of - {ok, U} -> - case check_password(Pass, U#user.password_hash) of - true -> {ok, U}; - _ -> refused - end; - {error, not_found} -> - refused - end. - -internal_lookup_vhost_access(Username, VHostPath) -> - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of - [] -> not_found; - [R] -> {ok, R} - end - end). - -check_vhost_access(#user{username = Username}, VHostPath) -> +check_user_pass_login(Username, Password) -> + check_user_login(Username, [{password, Password}]). + +check_user_login(Username, AuthProps) -> + {ok, Modules} = application:get_env(rabbit, auth_backends), + lists:foldl( + fun(Module, {refused, _, _}) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> + {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> + Else + end; + (_, {ok, User}) -> + {ok, User} + end, {refused, "No modules checked '~s'", [Username]}, Modules). + +check_vhost_access(User = #user{ username = Username, + auth_backend = Module }, VHostPath) -> ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), - case internal_lookup_vhost_access(Username, VHostPath) of - {ok, _R} -> - ok; - not_found -> - rabbit_misc:protocol_error( - access_refused, "access to vhost '~s' refused for user '~s'", - [VHostPath, Username]) - end. - -permission_index(configure) -> #permission.configure; -permission_index(write) -> #permission.write; -permission_index(read) -> #permission.read. - -check_resource_access(Username, - R = #resource{kind = exchange, name = <<"">>}, + check_access( + fun() -> + rabbit_vhost:exists(VHostPath) andalso + Module:check_vhost_access(User, VHostPath, write) + end, + "~s failed checking vhost access to ~s for ~s: ~p~n", + [Module, VHostPath, Username], + "access to vhost '~s' refused for user '~s'", + [VHostPath, Username]). + +check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, Permission) -> - check_resource_access(Username, - R#resource{name = <<"amq.default">>}, + check_resource_access(User, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(Username, - R = #resource{virtual_host = VHostPath, name = Name}, - Permission) -> - Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of - [] -> - false; - [#user_permission{permission = P}] -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end - end, - if Res -> ok; - true -> rabbit_misc:protocol_error( - access_refused, "access to ~s refused for user '~s'", - [rabbit_misc:rs(R), Username]) - end. - -add_user(Username, Password) -> - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password_hash = - hash_password(Password), - is_admin = false}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end), - rabbit_log:info("Created user ~p~n", [Username]), - R. - -delete_user(Username) -> - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)), - rabbit_log:info("Deleted user ~p~n", [Username]), - R. - -change_password(Username, Password) -> - change_password_hash(Username, hash_password(Password)). - -change_password_hash(Username, PasswordHash) -> - R = update_user(Username, fun(User) -> - User#user{ password_hash = PasswordHash } - end), - rabbit_log:info("Changed password for user ~p~n", [Username]), - R. - -hash_password(Cleartext) -> - Salt = make_salt(), - Hash = salted_md5(Salt, Cleartext), - <<Salt/binary, Hash/binary>>. - -check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> - Hash =:= salted_md5(Salt, Cleartext). - -make_salt() -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), - Salt = random:uniform(16#ffffffff), - <<Salt:32>>. - -salted_md5(Salt, Cleartext) -> - Salted = <<Salt/binary, Cleartext/binary>>, - erlang:md5(Salted). - -set_admin(Username) -> - set_admin(Username, true). - -clear_admin(Username) -> - set_admin(Username, false). - -set_admin(Username, IsAdmin) -> - R = update_user(Username, fun(User) -> - User#user{is_admin = IsAdmin} - end), - rabbit_log:info("Set user admin flag for user ~p to ~p~n", - [Username, IsAdmin]), - R. - -update_user(Username, Fun) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - {ok, User} = lookup_user(Username), - ok = mnesia:write(rabbit_user, Fun(User), write) - end)). - -list_users() -> - [{Username, IsAdmin} || - #user{username = Username, is_admin = IsAdmin} <- - mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})]. - -lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). - -add_vhost(VHostPath) -> - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_vhost, VHostPath}) of - [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), - [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], - ok; - [_] -> - mnesia:abort({vhost_already_exists, VHostPath}) - end - end), - rabbit_log:info("Added vhost ~p~n", [VHostPath]), - R. - -delete_vhost(VHostPath) -> - %%FIXME: We are forced to delete the queues outside the TX below - %%because queue deletion involves sending messages to the queue - %%process, which in turn results in further mnesia actions and - %%eventually the termination of that process. - lists:foreach(fun (Q) -> - {ok,_} = rabbit_amqqueue:delete(Q, false, false) - end, - rabbit_amqqueue:list(VHostPath)), - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> - ok = internal_delete_vhost(VHostPath) - end)), - rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), - R. - -internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _}) -> - ok = clear_permissions(Username, VHostPath) - end, - list_vhost_permissions(VHostPath)), - ok = mnesia:delete({rabbit_vhost, VHostPath}), - ok. - -vhost_exists(VHostPath) -> - mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. - -list_vhosts() -> - mnesia:dirty_all_keys(rabbit_vhost). - -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case re:compile(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) +check_resource_access(User = #user{username = Username, auth_backend = Module}, + Resource, Permission) -> + check_access( + fun() -> Module:check_resource_access(User, Resource, Permission) end, + "~s failed checking resource access to ~p for ~s: ~p~n", + [Module, Resource, Username], + "access to ~s refused for user '~s'", + [rabbit_misc:rs(Resource), Username]). + +check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> + Allow = case Fun() of + {error, _} = E -> + rabbit_log:error(ErrStr, ErrArgs ++ [E]), + false; + Else -> + Else + end, + case Allow of + true -> + ok; + false -> + rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) end. -set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> ok = mnesia:write( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = #permission{ - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, - write) - end)). - - -clear_permissions(Username, VHostPath) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> - ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) - end)). - -list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(match_user_vhost('_', '_'))]. - -list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_vhost( - VHostPath, match_user_vhost('_', VHostPath)))]. - -list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_user( - Username, match_user_vhost(Username, '_')))]. - -list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm} || - {_, _, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_user_and_vhost( - Username, VHostPath, - match_user_vhost(Username, VHostPath)))]. - -list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - #user_permission{user_vhost = #user_vhost{username = Username, - virtual_host = VHostPath}, - permission = #permission{ configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction(QueryThunk)]. - -match_user_vhost(Username, VHostPath) -> - fun () -> mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = '_'}, - read) - end. +%% Permission = write -> log in +%% Permission = read -> learn of the existence of (only relevant for +%% management plugin) +list_vhosts(User = #user{username = Username, auth_backend = Module}, + Permission) -> + lists:filter( + fun(VHost) -> + case Module:check_vhost_access(User, VHost, Permission) of + {error, _} = E -> + rabbit_log:warning("~w failed checking vhost access " + "to ~s for ~s: ~p~n", + [Module, VHost, Username, E]), + false; + Else -> + Else + end + end, rabbit_vhost:list()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 775c631d..35ed1c94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -152,9 +152,9 @@ (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -503,19 +503,17 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(F, Pids) -> - {_, Bad} = delegate:invoke(Pids, - fun (Pid) -> + case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( fun () -> ok end, fun () -> F(Pid) end) - end), - case Bad of - [] -> ok; - _ -> {error, Bad} + end) of + {_, []} -> ok; + {_, Bad} -> {error, Bad} end. delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_cast(Pid, Msg) -> - delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). + delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78bb6835..38b83117 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -199,6 +199,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(Ch, CTag) + || {Ch, CTag, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -226,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), + [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -372,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - {State2, ChAckTags1} = + ChAckTags1 = case AckRequired of - true -> {State1, - sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message(Message, State1), - ChAckTags} + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -394,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2#q{ + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State3); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -425,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). - -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {_ , undefined}} -> ok; - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok +confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {[], GTC}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + [rabbit_channel:confirm(ChPid, MsgSeqNos) || + {ChPid, MsgSeqNos} <- group_confirms_by_channel( + CMs1, [{Ch, [MsgSeqNo]}])]; + [] -> + ok end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. + State#q{guid_to_channel = GTC1}. -confirm_message(#basic_message{guid = Guid}, State) -> - confirm_message_by_guid(Guid, State). +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - State; + {no_confirm, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { @@ -449,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; + {confirm, + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; record_confirm_message(_Delivery, State) -> - State. - -ack_by_acktags(AckTags, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), - confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + {no_confirm, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -471,12 +481,12 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ, q = Q}) -> - NeedsConfirming = Message#basic_message.is_persistent andalso - Q#amqqueue.durable, - case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok + {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming + case {NeedsConfirming, MsgSeqNo} of + {_, undefined} -> ok; + {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + {confirm, _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -488,31 +498,37 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = NeedsConfirming}, + needs_confirming = (NeedsConfirming =:= confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), + {Delivered, NeedsConfirming, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + {NeedsConfirming, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> record_current_channel_tx(ChPid, Txn), {true, + NeedsConfirming, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, State1} -> + {true, _, State1} -> {true, State1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + {false, NeedsConfirming, State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}} -> + #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ - needs_confirming = (MsgSeqNo =/= undefined)}, + needs_confirming = + (NeedsConfirming =:= confirm)}, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. @@ -520,7 +536,7 @@ deliver_or_enqueue(Delivery, State) -> requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -536,12 +552,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(Ch, CTag) || + {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -617,12 +640,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {BQS2, State1} = - case Fun(BQS) of - {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)}; - BQS1 -> {BQS1, State} - end, - run_message_queue(State1#q{backing_queue_state = BQS2}). + {Guids, BQS1} = Fun(BQS), + run_message_queue( + confirm_messages(Guids, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -724,12 +744,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -743,18 +785,19 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -792,16 +835,10 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); -handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -816,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, case Delivered of - true -> State1; - false -> confirm_message(Message, State1) - end); + reply(Delivered, State1); -handle_call({deliver, Delivery}, _From, State) -> - %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Delivery, State), - reply(Delivered, NewState); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" delivery mode. Reply asap. + gen_server2:reply(From, true), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), + noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -859,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, sets:add_element(AckTag, ChAckTags)}), State2; - false -> confirm_message(Message, State2) + false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State3) @@ -902,6 +937,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -920,6 +957,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -994,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - NewState = ack_by_acktags(AckTags, State), - {NewC, NewState}; + BQS1 = BQ:ack(AckTags, BQS), + {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, State#q{backing_queue_state = BQS1}} @@ -1004,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -1013,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> ack_by_acktags(AckTags, State) + false -> BQS1 = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1} end) end; @@ -1107,7 +1148,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:idle_timeout(BQS) end, State)); + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl new file mode 100644 index 00000000..0dc8e61b --- /dev/null +++ b/src/rabbit_auth_backend.erl @@ -0,0 +1,76 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_backend). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% A description proplist as with auth mechanisms, + %% exchanges. Currently unused. + {description, 0}, + + %% Check a user can log in, given a username and a proplist of + %% authentication information (e.g. [{password, Password}]). + %% + %% Possible responses: + %% {ok, User} + %% Authentication succeeded, and here's the user record. + %% {error, Error} + %% Something went wrong. Log and die. + %% {refused, Msg, Args} + %% Client failed authentication. Log and die. + {check_user_login, 2}, + + %% Given #user, vhost path and permission, can a user access a vhost? + %% Permission is read - learn of the existence of (only relevant for + %% management plugin) + %% or write - log in + %% + %% Possible responses: + %% true + %% false + %% {error, Error} + %% Something went wrong. Log and die. + {check_vhost_access, 3}, + + %% Given #user, resource and permission, can a user access a resource? + %% + %% Possible responses: + %% true + %% false + %% {error, Error} + %% Something went wrong. Log and die. + {check_resource_access, 3} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl new file mode 100644 index 00000000..233e2b90 --- /dev/null +++ b/src/rabbit_auth_backend_internal.erl @@ -0,0 +1,347 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_backend_internal). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_backend). + +-export([description/0]). +-export([check_user_login/2, check_vhost_access/3, check_resource_access/3]). + +-export([add_user/2, delete_user/1, change_password/2, set_admin/1, + clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). +-export([make_salt/0, check_password/2, change_password_hash/2, + hash_password/1]). +-export([set_permissions/5, clear_permissions/2, + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). + +-include("rabbit_auth_backend_spec.hrl"). + +-ifdef(use_specs). + +-type(regexp() :: binary()). + +-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). +-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok'). +-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password()) + -> 'ok'). +-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok'). +-spec(make_salt/0 :: () -> binary()). +-spec(check_password/2 :: (rabbit_types:password(), + rabbit_types:password_hash()) -> boolean()). +-spec(change_password_hash/2 :: (rabbit_types:username(), + rabbit_types:password_hash()) -> 'ok'). +-spec(hash_password/1 :: (rabbit_types:password()) + -> rabbit_types:password_hash()). +-spec(set_admin/1 :: (rabbit_types:username()) -> 'ok'). +-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok'). +-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]). +-spec(lookup_user/1 :: (rabbit_types:username()) + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). +-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) + -> 'ok'). +-spec(list_permissions/0 :: + () -> [{rabbit_types:username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()}]). +-spec(list_vhost_permissions/1 :: + (rabbit_types:vhost()) -> [{rabbit_types:username(), + regexp(), regexp(), regexp()}]). +-spec(list_user_permissions/1 :: + (rabbit_types:username()) -> [{rabbit_types:vhost(), + regexp(), regexp(), regexp()}]). +-spec(list_user_vhost_permissions/2 :: + (rabbit_types:username(), rabbit_types:vhost()) + -> [{regexp(), regexp(), regexp()}]). + +-endif. + +%%---------------------------------------------------------------------------- + +%% Implementation of rabbit_auth_backend + +description() -> + [{name, <<"Internal">>}, + {description, <<"Internal user / password database">>}]. + +check_user_login(Username, []) -> + internal_check_user_login(Username, fun(_) -> true end); +check_user_login(Username, [{password, Password}]) -> + internal_check_user_login( + Username, + fun(#internal_user{password_hash = Hash}) -> + check_password(Password, Hash) + end); +check_user_login(Username, AuthProps) -> + exit({unknown_auth_props, Username, AuthProps}). + +internal_check_user_login(Username, Fun) -> + Refused = {refused, "user '~s' - invalid credentials", [Username]}, + case lookup_user(Username) of + {ok, User = #internal_user{is_admin = IsAdmin}} -> + case Fun(User) of + true -> {ok, #user{username = Username, + is_admin = IsAdmin, + auth_backend = ?MODULE, + impl = User}}; + _ -> Refused + end; + {error, not_found} -> + Refused + end. + +check_vhost_access(#user{is_admin = true}, _VHostPath, read) -> + true; + +check_vhost_access(#user{username = Username}, VHostPath, _) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> false; + [_R] -> true + end + end). + +check_resource_access(#user{username = Username}, + #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end + end. + +permission_index(configure) -> #permission.configure; +permission_index(write) -> #permission.write; +permission_index(read) -> #permission.read. + +%%---------------------------------------------------------------------------- +%% Manipulation of the user database + +add_user(Username, Password) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + is_admin = false}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end), + rabbit_log:info("Created user ~p~n", [Username]), + R. + +delete_user(Username) -> + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)), + rabbit_log:info("Deleted user ~p~n", [Username]), + R. + +change_password(Username, Password) -> + change_password_hash(Username, hash_password(Password)). + +clear_password(Username) -> + change_password_hash(Username, <<"">>). + +change_password_hash(Username, PasswordHash) -> + R = update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash } + end), + rabbit_log:info("Changed password for user ~p~n", [Username]), + R. + +hash_password(Cleartext) -> + Salt = make_salt(), + Hash = salted_md5(Salt, Cleartext), + <<Salt/binary, Hash/binary>>. + +check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> + Hash =:= salted_md5(Salt, Cleartext). + +make_salt() -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + Salt = random:uniform(16#ffffffff), + <<Salt:32>>. + +salted_md5(Salt, Cleartext) -> + Salted = <<Salt/binary, Cleartext/binary>>, + erlang:md5(Salted). + +set_admin(Username) -> + set_admin(Username, true). + +clear_admin(Username) -> + set_admin(Username, false). + +set_admin(Username, IsAdmin) -> + R = update_user(Username, fun(User) -> + User#internal_user{is_admin = IsAdmin} + end), + rabbit_log:info("Set user admin flag for user ~p to ~p~n", + [Username, IsAdmin]), + R. + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +list_users() -> + [{Username, IsAdmin} || + #internal_user{username = Username, is_admin = IsAdmin} <- + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). + +validate_regexp(RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) + end. + +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) + end)). + + +clear_permissions(Username, VHostPath) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) + end)). + +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(match_user_vhost('_', '_'))]. + +list_vhost_permissions(VHostPath) -> + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_vhost:with( + VHostPath, match_user_vhost('_', VHostPath)))]. + +list_user_permissions(Username) -> + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user( + Username, match_user_vhost(Username, '_')))]. + +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + +list_permissions(QueryThunk) -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + #user_permission{user_vhost = #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl new file mode 100644 index 00000000..ce1b16ac --- /dev/null +++ b/src/rabbit_auth_mechanism.erl @@ -0,0 +1,57 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% A description. + {description, 0}, + + %% Called before authentication starts. Should create a state + %% object to be passed through all the stages of authentication. + {init, 1}, + + %% Handle a stage of authentication. Possible responses: + %% {ok, User} + %% Authentication succeeded, and here's the user record. + %% {challenge, Challenge, NextState} + %% Another round is needed. Here's the state I want next time. + %% {protocol_error, Msg, Args} + %% Client got the protocol wrong. Log and die. + %% {refused, Msg, Args} + %% Client failed authentication. Log and die. + {handle_response, 2} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl new file mode 100644 index 00000000..5d51d904 --- /dev/null +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -0,0 +1,70 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_amqplain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism amqplain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually +%% defines this as PLAIN, but in 0-9 that definition is gone, instead +%% referring generically to "SASL security mechanism", i.e. the above. + +description() -> + [{name, <<"AMQPLAIN">>}, + {description, <<"QPid AMQPLAIN mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + LoginTable = rabbit_binary_parser:parse_table(Response), + case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), + lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of + {{value, {_, longstr, User}}, + {value, {_, longstr, Pass}}} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, + "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field", + [LoginTable]} + end. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl new file mode 100644 index 00000000..67665928 --- /dev/null +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_cr_demo). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism cr-demo"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"RABBIT-CR-DEMO">>, + ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok) +%% START-OK: Username +%% SECURE: "Please tell me your password" +%% SECURE-OK: "My password is ~s", [Password] + +description() -> + [{name, <<"RABBIT-CR-DEMO">>}, + {description, <<"RabbitMQ Demo challenge-response authentication " + "mechanism">>}]. + +init(_Sock) -> + #state{}. + +handle_response(Response, State = #state{username = undefined}) -> + {challenge, <<"Please tell me your password">>, + State#state{username = Response}}; + +handle_response(Response, #state{username = Username}) -> + case Response of + <<"My password is ", Password/binary>> -> + rabbit_access_control:check_user_pass_login(Username, Password); + _ -> + {protocol_error, "Invalid response '~s'", [Response]} + end. diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl new file mode 100644 index 00000000..1c4e5c15 --- /dev/null +++ b/src/rabbit_auth_mechanism_external.erl @@ -0,0 +1,107 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_external). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-include_lib("public_key/include/public_key.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism external"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials +%% established by means external to the mechanism". We define that to +%% mean the peer certificate's subject's CN. + +description() -> + [{name, <<"EXTERNAL">>}, + {description, <<"SASL EXTERNAL authentication mechanism">>}]. + +init(Sock) -> + Username = case rabbit_net:peercert(Sock) of + {ok, C} -> + CN = case rabbit_ssl:peer_cert_subject_item( + C, ?'id-at-commonName') of + not_found -> {refused, "no CN found", []}; + CN0 -> list_to_binary(CN0) + end, + case config_sane() of + true -> CN; + false -> {refused, "configuration unsafe", []} + end; + {error, no_peercert} -> + {refused, "no peer certificate", []}; + nossl -> + {refused, "not SSL connection", []} + end, + #state{username = Username}. + +handle_response(_Response, #state{username = Username}) -> + case Username of + {refused, _, _} = E -> + E; + _ -> + case rabbit_access_control:check_user_login(Username, []) of + {ok, User} -> + {ok, User}; + {error, not_found} -> + %% This is not an information leak as we have to + %% have validated a client cert to get this far. + {refused, "user '~s' not found", [Username]} + end + end. + +%%-------------------------------------------------------------------------- + +config_sane() -> + {ok, Opts} = application:get_env(ssl_options), + case {proplists:get_value(fail_if_no_peer_cert, Opts), + proplists:get_value(verify, Opts)} of + {true, verify_peer} -> + true; + {F, V} -> + rabbit_log:warning("EXTERNAL mechanism disabled, " + "fail_if_no_peer_cert=~p; " + "verify=~p~n", [F, V]), + false + end. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl new file mode 100644 index 00000000..e5f8f3e6 --- /dev/null +++ b/src/rabbit_auth_mechanism_plain.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_plain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism plain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"PLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, +%% apparently, by OpenAMQ. + +description() -> + [{name, <<"PLAIN">>}, + {description, <<"SASL PLAIN authentication mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + %% The '%%"' at the end of the next line is for Emacs + case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" + [{capture, all_but_first, binary}]) of + {match, [User, Pass]} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, "response ~p invalid", [Response]} + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 352e76fd..8603d8d7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. + %% about. Must return 1 guid per Ack, in the same order as Acks. {ack, 2}, %% A publish, but in the context of a transaction. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index a5297a70..e81066da 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -344,8 +344,7 @@ lookup_amqp_exception(#amqp_error{name = Name, {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - Protocol:lookup_amqp_exception(internal_error, Protocol), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 668fb9bb..ccadf5af 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue. %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_registry:lookup_module(exchange, T), Module. contains(Table, MatchHead) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a1db2ccf..7b5f096b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,10 +35,10 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). +-export([emit_stats/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,10 +47,9 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, most_recently_declared_queue, + user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -83,19 +80,21 @@ -type(channel_number() :: non_neg_integer()). -spec(start_link/7 :: - (channel_number(), pid(), pid(), rabbit_access_control:username(), + (channel_number(), pid(), pid(), rabbit_types:user(), rabbit_types:vhost(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -103,16 +102,14 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). --spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, +start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> @@ -121,6 +118,9 @@ do(Pid, Method) -> do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). +flush(Pid) -> + gen_server2:call(Pid, flush). + shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -133,6 +133,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). + list() -> pg_local:get_members(rabbit_channels). @@ -156,18 +159,9 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). -flush(Pid) -> - gen_server2:call(Pid, flush). - -flush_multiple_acks(Pid) -> - gen_server2:cast(Pid, flush_multiple_acks). - -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, +init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -183,7 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, next_tag = 1, uncommitted_ack_q = queue:new(), unacked_message_q = queue:new(), - username = Username, + user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), @@ -191,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, - publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), + publish_seqno = 1, unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -215,6 +207,9 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -224,9 +219,6 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(flush, _From, State) -> - reply(ok, State); - handle_call(_Request, _From, State) -> noreply(State). @@ -261,10 +253,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, - Msg = {_QName, QPid, _MsgId, Redelivered, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}}}, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, @@ -291,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_multiple_acks, State) -> - {noreply, flush_multiple(State)}; - -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -303,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -311,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = flush_multiple(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -370,7 +358,7 @@ return_queue_declare_ok(#resource{name = ActualName}, message_count = MessageCount, consumer_count = ConsumerCount}). -check_resource_access(Username, Resource, Perm) -> +check_resource_access(User, Resource, Perm) -> V = {Resource, Perm}, Cache = case get(permission_cache) of undefined -> []; @@ -380,7 +368,7 @@ check_resource_access(Username, Resource, Perm) -> case lists:member(V, Cache) of true -> lists:delete(V, Cache); false -> ok = rabbit_access_control:check_resource_access( - Username, Resource, Perm), + User, Resource, Perm), lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) end, put(permission_cache, [V | CacheTail]), @@ -390,14 +378,32 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configure_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, configure). +check_configure_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, configure). + +check_write_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, write). -check_write_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, write). +check_read_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, read). -check_read_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, read). +check_user_id_header(#'P_basic'{user_id = undefined}, _) -> + ok; +check_user_id_header(#'P_basic'{user_id = Username}, + #ch{user = #user{username = Username}}) -> + ok; +check_user_id_header(#'P_basic'{user_id = Claimed}, + #ch{user = #user{username = Actual}}) -> + rabbit_misc:protocol_error( + precondition_failed, "user_id property set to '~s' but " + "authenticated user was '~s'", [Claimed, Actual]). + +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( @@ -465,51 +471,30 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> - State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm([], _QPid, State) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. +confirm(MsgSeqNos, QPid, State) -> + {DoneMessages, State1} = + lists:foldl( + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + case gb_sets:is_element(MsgSeqNo, UC0) of + false -> {DMs, State0}; + true -> Qs1 = sets:del_element( + QPid, dict:fetch(MsgSeqNo, QFM0)), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end + end, {[], State}, MsgSeqNos), + send_confirms(DoneMessages, State1). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -539,19 +524,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + check_user_id_header(DecodedContent#content.properties, State), IsPersistent = is_message_persistent(DecodedContent), - {MsgSeqNo, State1} - = case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, - State#ch{publish_seqno = SeqNo + 1, - unconfirmed = - gb_sets:add(SeqNo, State#ch.unconfirmed)}} - end, + {MsgSeqNo, State1} = + case ConfirmEnabled of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -771,7 +755,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, - internal = false, + internal = Internal, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> @@ -794,10 +778,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, CheckedType, Durable, AutoDelete, + Internal, Args) end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Args), + AutoDelete, Internal, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, @@ -988,20 +973,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1231,27 +1206,53 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> - QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), +process_routing_result(routed, QPids, MsgSeqNo, _, State) -> + #ch{queues_for_msg = QFM, unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> State. -terminate(State) -> - stop_confirm_timer(State), +send_confirms([], State) -> + State; +send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> + send_confirm(MsgSeqNo, WriterPid), + State; +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SCs = lists:usort(Cs), + CutOff = case gb_sets:is_empty(UC) of + true -> lists:last(SCs) + 1; + false -> gb_sets:smallest(UC) + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, + [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + State. + +send_confirm(undefined, _WriterPid) -> + ok; +send_confirm(SeqNo, WriterPid) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = SeqNo}). + +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1260,7 +1261,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{username = Username}) -> Username; +i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> @@ -1337,42 +1338,3 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. - -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, - ?MODULE, flush_multiple_acks, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> - State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -flush_multiple(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 02199a65..9f50176d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -48,15 +48,15 @@ -type(start_link_args() :: {rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). -endif. %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -69,16 +69,11 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, Username, VHost, + [Channel, ReaderPid, WriterPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingChannelPid} = - supervisor2:start_child( - SupPid, - {framing_channel, {rabbit_framing_channel, start_link, - [ReaderPid, ChannelPid, Protocol]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, SupPid, FramingChannelPid}. + {ok, AState} = rabbit_command_assembler:init(Protocol), + {ok, SupPid, {ChannelPid, AState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 21c39780..fd99af56 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()}). + {'ok', pid(), {pid(), any()}}). -endif. diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl new file mode 100644 index 00000000..f8d3260e --- /dev/null +++ b/src/rabbit_command_assembler.erl @@ -0,0 +1,148 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_command_assembler). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([analyze_frame/3, init/1, process/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | + ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY | + ?FRAME_TRACE | ?FRAME_HEARTBEAT). +-type(protocol() :: rabbit_framing:protocol()). +-type(method() :: rabbit_framing:amqp_method_record()). +-type(class_id() :: rabbit_framing:amqp_class_id()). +-type(weight() :: non_neg_integer()). +-type(body_size() :: non_neg_integer()). +-type(content() :: rabbit_types:undecoded_content()). + +-type(frame() :: + {'method', rabbit_framing:amqp_method_name(), binary()} | + {'content_header', class_id(), weight(), body_size(), binary()} | + {'content_body', binary()}). + +-type(state() :: + {'method', protocol()} | + {'content_header', method(), class_id(), protocol()} | + {'content_body', method(), body_size(), class_id(), protocol()}). + +-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) -> + frame() | 'heartbeat' | 'error'). + +-spec(init/1 :: (protocol()) -> {ok, state()}). +-spec(process/2 :: (frame(), state()) -> + {ok, state()} | + {ok, method(), state()} | + {ok, method(), content(), state()} | + {error, rabbit_types:amqp_error()}). + +-endif. + +%%-------------------------------------------------------------------- + +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> + {content_header, ClassId, Weight, BodySize, Properties}; +analyze_frame(?FRAME_BODY, Body, _Protocol) -> + {content_body, Body}; +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> + heartbeat; +analyze_frame(_Type, _Body, _Protocol) -> + error. + +init(Protocol) -> {ok, {method, Protocol}}. + +process({method, MethodName, FieldsBin}, {method, Protocol}) -> + try + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + {ok, {content_header, Method, ClassId, Protocol}}; + false -> {ok, Method, {method, Protocol}} + end + catch exit:#amqp_error{} = Reason -> {error, Reason} + end; +process(_Frame, {method, _Protocol}) -> + unexpected_frame("expected method frame, " + "got non method frame instead", [], none); +process({content_header, ClassId, 0, 0, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, Method, Content, {method, Protocol}}; +process({content_header, ClassId, 0, BodySize, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, {content_body, Method, BodySize, Content, Protocol}}; +process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, + {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId], Method); +process(_Frame, {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", [ClassId], Method); +process({content_body, FragmentBin}, + {content_body, Method, RemainingSize, + Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> + NewContent = Content#content{ + payload_fragments_rev = [FragmentBin | Fragments]}, + case RemainingSize - size(FragmentBin) of + 0 -> {ok, Method, NewContent, {method, Protocol}}; + Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} + end; +process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> + unexpected_frame("expected content body, " + "got non content body frame instead", [], Method). + +%%-------------------------------------------------------------------- + +empty_content(ClassId, PropertiesBin, Protocol) -> + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = []}. + +unexpected_frame(Format, Params, Method) when is_atom(Method) -> + {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)}; +unexpected_frame(Format, Params, Method) -> + unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index ff3995b5..a6b1f7fa 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -78,4 +78,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. - diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 360217a2..8a3275bc 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -201,44 +201,48 @@ action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> Inform("Creating user ~p", [Username]), - call(Node, {rabbit_access_control, add_user, Args}); + call(Node, {rabbit_auth_backend_internal, add_user, Args}); action(delete_user, Node, Args = [_Username], _Opts, Inform) -> Inform("Deleting user ~p", Args), - call(Node, {rabbit_access_control, delete_user, Args}); + call(Node, {rabbit_auth_backend_internal, delete_user, Args}); action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), - call(Node, {rabbit_access_control, change_password, Args}); + call(Node, {rabbit_auth_backend_internal, change_password, Args}); + +action(clear_password, Node, Args = [Username], _Opts, Inform) -> + Inform("Clearing password for user ~p", [Username]), + call(Node, {rabbit_auth_backend_internal, clear_password, Args}); action(set_admin, Node, [Username], _Opts, Inform) -> Inform("Setting administrative status for user ~p", [Username]), - call(Node, {rabbit_access_control, set_admin, [Username]}); + call(Node, {rabbit_auth_backend_internal, set_admin, [Username]}); action(clear_admin, Node, [Username], _Opts, Inform) -> Inform("Clearing administrative status for user ~p", [Username]), - call(Node, {rabbit_access_control, clear_admin, [Username]}); + call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]}); action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), - display_list(call(Node, {rabbit_access_control, list_users, []})); + display_list(call(Node, {rabbit_auth_backend_internal, list_users, []})); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), - call(Node, {rabbit_access_control, add_vhost, Args}); + call(Node, {rabbit_vhost, add, Args}); action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), - call(Node, {rabbit_access_control, delete_vhost, Args}); + call(Node, {rabbit_vhost, delete, Args}); action(list_vhosts, Node, [], _Opts, Inform) -> Inform("Listing vhosts", []), - display_list(call(Node, {rabbit_access_control, list_vhosts, []})); + display_list(call(Node, {rabbit_vhost, list, []})); action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_permissions, - Args})); + display_list(call(Node, {rabbit_auth_backend_internal, + list_user_permissions, Args})); action(list_queues, Node, Args, Opts, Inform) -> Inform("Listing queues", []), @@ -292,19 +296,20 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, set_permissions, + call(Node, {rabbit_auth_backend_internal, set_permissions, [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); + call(Node, {rabbit_auth_backend_internal, clear_permissions, + [Username, VHost]}); action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), - display_list(call(Node, {rabbit_access_control, list_vhost_permissions, - [VHost]})). + display_list(call(Node, {rabbit_auth_backend_internal, + list_vhost_permissions, [VHost]})). default_if_empty(List, Default) when is_list(List) -> if List == [] -> diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 42861f86..dd009c83 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -49,7 +49,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, false, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 00e479a2..a95cf0b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,11 +33,11 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, +-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). +-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -49,13 +49,14 @@ -type(type() :: atom()). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: - (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) +-spec(declare/6 :: + (name(), type(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table()) -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom() | rabbit_types:connection_exit()). --spec(assert_equivalence/5 :: - (rabbit_types:exchange(), atom(), boolean(), boolean(), +-spec(assert_equivalence/6 :: + (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit()). -spec(assert_args_equivalence/2 :: @@ -90,7 +91,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> Xs = rabbit_misc:table_fold( @@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(XName, Type, Durable, AutoDelete, Args) -> +declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, durable = Durable, auto_delete = AutoDelete, + internal = Internal, arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return @@ -150,17 +152,17 @@ declare(XName, Type, Durable, AutoDelete, Args) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_registry:lookup_module(exchange, T), Module. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> - case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); T -> - case rabbit_exchange_type_registry:lookup_module(T) of + case rabbit_registry:lookup_module(exchange, T) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]); @@ -170,14 +172,16 @@ check_type(TypeBin) -> assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, + internal = Internal, type = Type}, - Type, Durable, AutoDelete, RequiredArgs) -> + Type, Durable, AutoDelete, Internal, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, - _Args) -> +assert_equivalence(#exchange{ name = Name }, + _Type, _Durable, _Internal, _AutoDelete, _Args) -> rabbit_misc:protocol_error( precondition_failed, - "cannot redeclare ~s with different type, durable or autodelete value", + "cannot redeclare ~s with different type, durable, " + "internal or autodelete value", [rabbit_misc:rs(Name)]). assert_args_equivalence(#exchange{ name = Name, arguments = Args }, @@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d934a497..d49d0199 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"direct">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"direct">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 77ca9686..e7f75464 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"fanout">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"fanout">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index ec9e7ba4..caf141fe 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -42,9 +42,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"headers">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"headers">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -ifdef(use_specs). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index d3ecdd4d..44851858 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"topic">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"topic">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -export([topic_matches/2]). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl deleted file mode 100644 index cb53185f..00000000 --- a/src/rabbit_framing_channel.erl +++ /dev/null @@ -1,129 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_framing_channel). --include("rabbit.hrl"). - --export([start_link/3, process/2, shutdown/1]). - -%% internal --export([mainloop/3]). - -%%-------------------------------------------------------------------- - -start_link(Parent, ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. - -process(Pid, Frame) -> - Pid ! {frame, Frame}, - ok. - -shutdown(Pid) -> - Pid ! terminate, - ok. - -%%-------------------------------------------------------------------- - -read_frame(ChannelPid) -> - receive - {frame, Frame} -> Frame; - terminate -> rabbit_channel:shutdown(ChannelPid), - read_frame(ChannelPid); - Msg -> exit({unexpected_message, Msg}) - end. - -mainloop(Parent, ChannelPid, Protocol) -> - case read_frame(ChannelPid) of - {method, MethodName, FieldsBin} -> - Method = Protocol:decode_method_fields(MethodName, FieldsBin), - case Protocol:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - case collect_content(ChannelPid, ClassId, Protocol) of - {ok, Content} -> - rabbit_channel:do(ChannelPid, Method, Content), - ?MODULE:mainloop(Parent, ChannelPid, Protocol); - {error, Reason} -> - channel_exit(Parent, Reason, MethodName) - end; - false -> rabbit_channel:do(ChannelPid, Method), - ?MODULE:mainloop(Parent, ChannelPid, Protocol) - end; - _ -> - channel_exit(Parent, {unexpected_frame, - "expected method frame, " - "got non method frame instead", - []}, none) - end. - -collect_content(ChannelPid, ClassId, Protocol) -> - case read_frame(ChannelPid) of - {content_header, ClassId, 0, BodySize, PropertiesBin} -> - case collect_content_payload(ChannelPid, BodySize, []) of - {ok, Payload} -> {ok, #content{ - class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - protocol = Protocol, - payload_fragments_rev = Payload}}; - Error -> Error - end; - {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - {error, {unexpected_frame, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]}}; - _ -> - {error, {unexpected_frame, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]}} - end. - -collect_content_payload(_ChannelPid, 0, Acc) -> - {ok, Acc}; -collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> - case read_frame(ChannelPid) of - {content_body, FragmentBin} -> - collect_content_payload(ChannelPid, - RemainingByteCount - size(FragmentBin), - [FragmentBin | Acc]); - _ -> - {error, {unexpected_frame, - "expected content body, " - "got non content body frame instead", - []}} - end. - -channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) -> - Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params, - MethodName), - Parent ! {channel_exit, self(), Reason}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 52d76ac4..15ba787a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,7 +46,7 @@ -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). --export([with_user/2, with_vhost/2, with_user_and_vhost/3]). +-export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). @@ -72,7 +72,7 @@ -ifdef(use_specs). --export_type([resource_name/0]). +-export_type([resource_name/0, thunk/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -137,10 +137,9 @@ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A). --spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A). +-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: - (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A)) + (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). @@ -344,8 +343,8 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch - exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + catch exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> Handler() end. @@ -366,19 +365,8 @@ with_user(Username, Thunk) -> end end. -with_vhost(VHostPath, Thunk) -> - fun () -> - case mnesia:read({rabbit_vhost, VHostPath}) of - [] -> - mnesia:abort({no_such_vhost, VHostPath}); - [_V] -> - Thunk() - end - end. - with_user_and_vhost(Username, VHostPath, Thunk) -> - with_user(Username, with_vhost(VHostPath, Thunk)). - + with_user(Username, rabbit_vhost:with(VHostPath, Thunk)). execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read @@ -589,19 +577,19 @@ sort_field_table(Arguments) -> pid_to_string(Pid) when is_pid(Pid) -> %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and %% 8.7) - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). %% inverse of above string_to_pid(Str) -> Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. - case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", + case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of - {match, [NodeStr, IdStr, SerStr]} -> + {match, [NodeStr, CreStr, IdStr, SerStr]} -> %% the NodeStr atom might be quoted, so we have to parse %% it rather than doing a simple list_to_atom NodeAtom = case erl_scan:string(NodeStr) of @@ -609,9 +597,9 @@ string_to_pid(Str) -> {error, _, _} -> throw(Err) end, <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), - Id = list_to_integer(IdStr), - Ser = list_to_integer(SerStr), - binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + [Cre, Id, Ser] = lists:map(fun list_to_integer/1, + [CreStr, IdStr, SerStr]), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); nomatch -> throw(Err) end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a62e7a6f..38cc82a6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -34,7 +34,8 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, - is_clustered/0, empty_ram_only_tables/0, copy_db/1]). + is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, + empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -63,6 +64,8 @@ -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(is_clustered/0 :: () -> boolean()). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(all_clustered_nodes/0 :: () -> [node()]). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -81,12 +84,12 @@ status() -> Nodes = nodes_of_type(CopyType), Nodes =/= [] end]; - no -> case mnesia:system_info(db_nodes) of + no -> case all_clustered_nodes() of [] -> []; Nodes -> [{unknown, Nodes}] end end}, - {running_nodes, mnesia:system_info(running_db_nodes)}]. + {running_nodes, running_clustered_nodes()}]. init() -> ok = ensure_mnesia_running(), @@ -127,9 +130,15 @@ reset() -> reset(false). force_reset() -> reset(true). is_clustered() -> - RunningNodes = mnesia:system_info(running_db_nodes), + RunningNodes = running_clustered_nodes(), [node()] /= RunningNodes andalso [] /= RunningNodes. +all_clustered_nodes() -> + mnesia:system_info(db_nodes). + +running_clustered_nodes() -> + mnesia:system_info(running_db_nodes). + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -154,10 +163,10 @@ nodes_of_type(Type) -> table_definitions() -> [{rabbit_user, - [{record_name, user}, - {attributes, record_info(fields, user)}, + [{record_name, internal_user}, + {attributes, record_info(fields, internal_user)}, {disc_copies, [node()]}, - {match, #user{_='_'}}]}, + {match, #internal_user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, @@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), - mnesia:system_info(db_nodes)} of + case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade ok = wait_for_tables(), @@ -388,7 +396,7 @@ init_db(ClusterNodes, Force) -> ensure_version_ok(rabbit_upgrade:read_version()), ensure_schema_ok(); {[], false, _} -> - %% First RAM node in cluster, start from scratch + %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up @@ -566,8 +574,8 @@ reset(Force) -> {Nodes, RunningNodes} = try ok = init(), - {mnesia:system_info(db_nodes) -- [Node], - mnesia:system_info(running_db_nodes) -- [Node]} + {all_clustered_nodes() -- [Node], + running_clustered_nodes() -- [Node]} after mnesia:stop() end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e8b4e8e2..f8b41ed3 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,6 +81,7 @@ file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table + dying_clients, %% set of dying clients client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? @@ -306,6 +307,17 @@ %% sure that reads are not attempted from files which are in the %% process of being garbage collected. %% +%% When a message is removed, its reference count is decremented. Even +%% if the reference count becomes 0, its entry is not removed. This is +%% because in the event of the same message being sent to several +%% different queues, there is the possibility of one queue writing and +%% removing the message before other queues write it at all. Thus +%% accomodating 0-reference counts allows us to avoid unnecessary +%% writes here. Of course, there are complications: the file to which +%% the message has already been written could be locked pending +%% deletion or GC, which means we have to rewrite the message as the +%% original copy will now be lost. +%% %% The server automatically defers reads, removes and contains calls %% that occur which refer to files which are currently being %% GC'd. Contains calls are only deferred in order to ensure they do @@ -323,6 +335,55 @@ %% heavily overloaded, clients can still write and read messages with %% very low latency and not block at all. %% +%% Clients of the msg_store are required to register before using the +%% msg_store. This provides them with the necessary client-side state +%% to allow them to directly access the various caches and files. When +%% they terminate, they should deregister. They can do this by calling +%% either client_terminate/1 or client_delete_and_terminate/1. The +%% differences are: (a) client_terminate is synchronous. As a result, +%% if the msg_store is badly overloaded and has lots of in-flight +%% writes and removes to process, this will take some time to +%% return. However, once it does return, you can be sure that all the +%% actions you've issued to the msg_store have been processed. (b) Not +%% only is client_delete_and_terminate/1 asynchronous, but it also +%% permits writes and subsequent removes from the current +%% (terminating) client which are still in flight to be safely +%% ignored. Thus from the point of view of the msg_store itself, and +%% all from the same client: +%% +%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N +%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 --> +%% +%% The client obviously sent T after all the other messages (up to +%% W4), but because the msg_store prioritises messages, the T can be +%% promoted and thus received early. +%% +%% Thus at the point of the msg_store receiving T, we have messages 1 +%% and 2 with a refcount of 1. After T, W3 will be ignored because +%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be +%% ignored because the messages that they refer to were already known +%% to the msg_store prior to T. However, it can be a little more +%% complex: after the first R2, the refcount of msg 2 is 0. At that +%% point, if a GC occurs or file deletion, msg 2 could vanish, which +%% would then mean that the subsequent W2 and R2 are then ignored. +%% +%% The use case then for client_delete_and_terminate/1 is if the +%% client wishes to remove everything it's written to the msg_store: +%% it issues removes for all messages it's written and not removed, +%% and then calls client_delete_and_terminate/1. At that point, any +%% in-flight writes (and subsequent removes) can be ignored, but +%% removes and writes for messages the msg_store already knows about +%% will continue to be processed normally (which will normally just +%% involve modifying the reference count, which is fast). Thus we save +%% disk bandwidth for writes which are going to be immediately removed +%% again by the the terminating client. +%% +%% We use a separate set to keep track of the dying clients in order +%% to keep that set, which is inspected on every write and remove, as +%% small as possible. Inspecting client_refs - the set of all clients +%% - would degrade performance with many healthy clients and few, if +%% any, dying clients, which is the typical case. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) -> client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), + ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). client_ref(#client_msstate { client_ref = Ref }) -> Ref. @@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients = sets:new(), client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) -> {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; + {client_dying, _Pid} -> 7; _ -> 0 end. @@ -681,65 +745,65 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1). +handle_cast({client_dying, CRef}, + State = #msstate { dying_clients = DyingClients }) -> + DyingClients1 = sets:add_element(CRef, DyingClients), + write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { - client_refs = sets:del_element(CRef, ClientRefs) }); + State = #msstate { client_refs = ClientRefs, + dying_clients = DyingClients }) -> + State1 = clear_client_callback( + CRef, State #msstate { + client_refs = sets:del_element(CRef, ClientRefs), + dying_clients = sets:del_element(CRef, DyingClients) }), + noreply(remove_message(CRef, CRef, State1)); handle_cast({write, CRef, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - current_file = CurFile, + State = #msstate { file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, client_ondisk_callback = CODC, cref_to_guids = CTG }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - CTG1 = case dict:find(CRef, CODC) of - {ok, _} -> dict:update(CRef, fun(Guids) -> - gb_sets:add(Guid, Guids) - end, - gb_sets:empty(), CTG); - error -> CTG - end, + CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC), State1 = State #msstate { cref_to_guids = CTG1 }, - case index_lookup(Guid, State1) of - not_found -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + noreply(State); + {false, not_found} -> write_message(Guid, Msg, State1); - #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State1), write_message(Guid, Msg, State1); - [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State1), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, TotalSize}]), - noreply(State1 #msstate { - sum_valid_data = SumValid + TotalSize }) + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently, + %% we'll have to write a new copy, which will then + %% be younger, so ignore this write. + noreply(State); + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State2 = client_confirm_if_on_disk(CRef, Guid, File, State), + noreply(adjust_valid_total_size(File, TotalSize, State2)) end; - #msg_location { ref_count = RefCount, file = File } -> + {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State1), - CTG2 = case {dict:find(CRef, CODC), File} of - {{ok, _}, CurFile} -> CTG1; - {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), - CTG; - _ -> CTG1 - end, - noreply(State #msstate { cref_to_guids = CTG2 }) + ok = index_update_ref_count(Guid, RefCount + 1, State), + noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) end; handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, State2) end, + fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, State, Guids), - State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), - noreply(maybe_compact(State2)); + noreply(maybe_compact( + client_confirm(CRef, gb_sets:from_list(Guids), removed, State1))); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -861,9 +925,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, - cref_to_guids = CTG }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, Guids, NS) -> case gb_sets:is_empty(Guids) of @@ -871,14 +935,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, Guids} | NS] end end, [], CTG), - if Syncs =:= [] andalso CGs =:= [] -> ok; - true -> file_handle_cache:sync(CurHdl) + case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + [K() || K <- lists:reverse(Syncs)], + [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. - write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, @@ -990,34 +1054,43 @@ contains_message(Guid, From, end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = - index_lookup_positive_ref_count(Guid, State), - %% only update field, otherwise bad interaction with concurrent GC - Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, File, State); - [#file_summary {}] -> +remove_message(Guid, CRef, + State = #msstate { file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg + %% whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = + fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME here + %% because there may be further writes in the mailbox + %% for the same msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, Guid, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size(File, -TotalSize, + State)) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), ok = Dec(), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, -TotalSize}]), - delete_file_if_empty( - File, State #msstate { - sum_valid_data = SumValid - TotalSize }) - end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), - ok = Dec(), - State + State + end end. add_to_pending_gc_completion( @@ -1039,8 +1112,8 @@ run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending_action({remove, Guid}, State) -> - remove_message(Guid, State). +run_pending_action({remove, Guid, CRef}, State) -> + remove_message(Guid, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1051,15 +1124,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +adjust_valid_total_size(File, Delta, State = #msstate { + sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts }) -> + [_] = ets:update_counter(FileSummaryEts, File, + [{#file_summary.valid_total_size, Delta}]), + State #msstate { sum_valid_data = SumValid + Delta }. + orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, +client_confirm(CRef, Guids, ActionTaken, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids), + {ok, Fun} -> Fun(Guids, ActionTaken), CTG1 = case dict:find(CRef, CTG) of {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), @@ -1073,6 +1153,52 @@ client_confirm(CRef, Guids, error -> State end. +add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) -> + case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, + fun (Guids) -> gb_sets:add(Guid, Guids) end, + gb_sets:singleton(Guid), CTG); + error -> CTG + end. + +client_confirm_if_on_disk(CRef, Guid, File, + State = #msstate { client_ondisk_callback = CODC, + current_file = CurFile, + cref_to_guids = CTG }) -> + CTG1 = + case File of + CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC); + _ -> case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(gb_sets:singleton(Guid), written); + _ -> ok + end, + CTG + end, + State #msstate { cref_to_guids = CTG1 }. + +%% Detect whether the Guid is older or younger than the client's death +%% msg (if there is one). If the msg is older than the client death +%% msg, and it has a 0 ref_count we must only alter the ref_count, not +%% rewrite the msg - rewriting it would make it younger than the death +%% msg and thus should be ignored. Note that this (correctly) returns +%% false when testing to remove the death msg itself. +should_mask_action(CRef, Guid, + State = #msstate { dying_clients = DyingClients }) -> + case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of + {false, Location} -> + {false, Location}; + {true, not_found} -> + {true, not_found}; + {true, #msg_location { file = File, offset = Offset, + ref_count = RefCount } = Location} -> + #msg_location { file = DeathFile, offset = DeathOffset } = + index_lookup(CRef, State), + {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of + {true, _} -> true; + {false, 0} -> false_if_increment; + {false, _} -> false + end, Location} + end. %%---------------------------------------------------------------------------- %% file helper functions diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 89954b06..c6a083bb 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -32,7 +32,7 @@ -module(rabbit_net). -include("rabbit.hrl"). --export([is_ssl/1, controlling_process/2, getstat/2, +-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, async_recv/3, port_command/2, send/2, close/1, sockname/1, peername/1, peercert/1]). @@ -50,6 +50,9 @@ -type(socket() :: port() | #ssl_socket{}). -spec(is_ssl/1 :: (socket()) -> boolean()). +-spec(ssl_info/1 :: (socket()) + -> 'nossl' | ok_val_or_error( + {atom(), {atom(), atom(), atom()}})). -spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(getstat/2 :: (socket(), [stat_option()]) @@ -77,6 +80,11 @@ is_ssl(Sock) -> ?IS_SSL(Sock). +ssl_info(Sock) when ?IS_SSL(Sock) -> + ssl:connection_info(Sock#ssl_socket.ssl); +ssl_info(_Sock) -> + nossl. + controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); controlling_process(Sock, Pid) when is_port(Sock) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1c542ffe..d5a9d73c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) -> connections() -> [rabbit_connection_sup:reader(ConnSup) || + Node <- rabbit_mnesia:running_clustered_nodes(), {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children({rabbit_tcp_client_sup, Node})]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 867ecb12..8ae45abd 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -53,7 +53,7 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), %% Determine our various directories - [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(), + [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins @@ -132,7 +132,7 @@ start() -> || App <- PluginApps], io:nl(), - ok = duplicate_node_check(Node), + ok = duplicate_node_check(NodeStr), terminate(0), ok. @@ -259,7 +259,8 @@ process_entry(Entry) -> duplicate_node_check([]) -> %% Ignore running node while installing windows service ok; -duplicate_node_check(Node) -> +duplicate_node_check(NodeStr) -> + Node = rabbit_misc:makenode(NodeStr), {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), case net_adm:names(NodeHost) of {ok, NamePorts} -> @@ -272,7 +273,6 @@ duplicate_node_check(Node) -> terminate(?ERROR_CODE); false -> ok end; - {error, address} -> ok; {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n", [EpmdReason]) end. @@ -283,12 +283,9 @@ terminate(Fmt, Args) -> terminate(Status) -> case os:type() of - {unix, _} -> - halt(Status); - {win32, _} -> - init:stop(Status), - receive - after infinity -> ok - end + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status), + receive + after infinity -> ok + end end. - diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..2162104f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,11 +297,12 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -sync([], State) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = Guids }) -> + sync_if([] =/= Guids, State). + +sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled above anyway). - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). + %% emptied (handled by sync_if anyway). + sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +sync_if(false, State) -> + State; +sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> + State; +sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index cdb3586a..05e383d8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([conserve_memory/2, server_properties/0]). --export([analyze_frame/3]). +-export([process_channel_frame/5]). %% used by erlang-client -export([emit_stats/1]). @@ -55,14 +55,17 @@ -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun}). + channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism, + auth_state}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, - peer_cert_validity, + peer_cert_validity, auth_mechanism, + ssl_protocol, ssl_key_exchange, + ssl_cipher, ssl_hash, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -293,7 +296,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, stats_timer = rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun + start_heartbeat_fun = StartHeartbeatFun, + auth_mechanism = none, + auth_state = none }, handshake, 8)) catch @@ -343,12 +348,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> + {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, ChannelOrFrPid, Reason} -> - mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'DOWN', _MRef, process, ChSupPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); + {channel_exit, Channel, Reason} -> + mainloop(Deb, handle_exception(State, Channel, Reason)); + {'DOWN', _MRef, process, ChPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -439,45 +444,32 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> - {channel, Channel} = get({ch_fr_pid, ChFrPid}), - handle_exception(State, Channel, Reason); -handle_channel_exit(Channel, Reason, State) -> - handle_exception(State, Channel, Reason). - -handle_dependent_exit(ChSupPid, Reason, State) -> +handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - case erase({ch_sup_pid, ChSupPid}) of - undefined -> ok; - {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) - end, + erase({ch_pid, ChPid}), maybe_close(State); uncontrolled -> - case channel_cleanup(ChSupPid) of - undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); - Channel -> - maybe_close(handle_exception(State, Channel, Reason)) + case channel_cleanup(ChPid) of + undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); + Channel -> maybe_close( + handle_exception(State, Channel, Reason)) end end. -channel_cleanup(ChSupPid) -> - case get({ch_sup_pid, ChSupPid}) of - undefined -> undefined; - {{channel, Channel}, ChFr} -> erase({channel, Channel}), - erase(ChFr), - erase({ch_sup_pid, ChSupPid}), - Channel +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + Channel -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + Channel end. -all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, - {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(ChFrPid) - || ChFrPid <- all_channels()]), + length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -495,10 +487,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'DOWN', _MRef, process, ChSupPid, Reason} -> - case channel_cleanup(ChSupPid) of + {'DOWN', _MRef, process, ChPid, Reason} -> + case channel_cleanup(ChPid) of undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); + exit({abnormal_dependent_exit, ChPid, Reason}); Channel -> case termination_kind(Reason) of controlled -> @@ -529,15 +521,13 @@ maybe_close(State) -> State. termination_kind(normal) -> controlled; -termination_kind(shutdown) -> controlled; -termination_kind({shutdown, _Term}) -> controlled; termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -547,7 +537,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> @@ -556,19 +546,23 @@ handle_frame(Type, 0, Payload, end; handle_frame(Type, Channel, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ch_fr_pid, ChFrPid} -> - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), + {ChPid, FramingState} -> + NewAState = process_channel_frame( + AnalyzedFrame, self(), + Channel, ChPid, FramingState), + put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), State; {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking andalso + case (State#v1.connection_state =:= blocking + andalso Protocol:method_has_content(MethodName)) of true -> State#v1{connection_state = blocked}; false -> State @@ -597,9 +591,8 @@ handle_frame(Type, Channel, Payload, State; undefined -> case ?IS_RUNNING(State) of - true -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; + true -> send_to_new_channel( + Channel, AnalyzedFrame, State); false -> throw({channel_frame_while_starting, Channel, State#v1.connection_state, AnalyzedFrame}) @@ -607,22 +600,6 @@ handle_frame(Type, Channel, Payload, end end. -analyze_frame(?FRAME_METHOD, - <<ClassId:16, MethodId:16, MethodFields/binary>>, - Protocol) -> - MethodName = Protocol:lookup_method_name({ClassId, MethodId}), - {method, MethodName, MethodFields}; -analyze_frame(?FRAME_HEADER, - <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, - _Protocol) -> - {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body, _Protocol) -> - {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> - heartbeat; -analyze_frame(_Type, _Body, _Protocol) -> - error. - handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -680,11 +657,12 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> - Start = #'connection.start'{ version_major = ProtocolMajor, - version_minor = ProtocolMinor, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }, + Start = #'connection.start'{ + version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = auth_mechanisms_binary(), + locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ timeout_sec = ?NORMAL_TIMEOUT, @@ -709,42 +687,45 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), - State) - catch exit:Reason -> - CompleteReason = case Reason of - #amqp_error{method = none} -> - Reason#amqp_error{method = MethodName}; - OtherReason -> OtherReason - end, + HandleException = + fun(R) -> case ?IS_RUNNING(State) of - true -> send_exception(State, 0, CompleteReason); + true -> send_exception(State, 0, R); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, - CompleteReason}) + throw({channel0_error, State#v1.connection_state, R}) end + end, + try + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + State) + catch exit:#amqp_error{method = none} = Reason -> + HandleException(Reason#amqp_error{method = MethodName}); + Type:Reason -> + HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, - State = #v1{connection_state = starting, - connection = Connection = - #connection{protocol = Protocol}, - sock = Sock}) -> - User = rabbit_access_control:check_login(Mechanism, Response), - Tune = #'connection.tune'{channel_max = 0, - frame_max = server_frame_max(), - heartbeat = 0}, - ok = send_on_channel0(Sock, Tune, Protocol), - State#v1{connection_state = tuning, - connection = Connection#connection{ - user = User, - client_properties = ClientProperties}}; + State0 = #v1{connection_state = starting, + connection = Connection, + sock = Sock}) -> + AuthMechanism = auth_mechanism_to_module(Mechanism), + State = State0#v1{auth_mechanism = AuthMechanism, + auth_state = AuthMechanism:init(Sock), + connection_state = securing, + connection = + Connection#connection{ + client_properties = ClientProperties}}, + auth_phase(Response, State); + +handle_method0(#'connection.secure_ok'{response = Response}, + State = #v1{connection_state = securing}) -> + auth_phase(Response, State); + handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, @@ -761,17 +742,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> - SendFun = - fun() -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = - fun() -> - Parent ! timeout - end, + ReceiveFun = fun() -> Parent ! timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -802,7 +776,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, @@ -833,6 +807,61 @@ server_frame_max() -> send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). +auth_mechanism_to_module(TypeBin) -> + case rabbit_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown authentication mechanism '~s'", + [TypeBin]); + T -> + case {lists:member(T, auth_mechanisms()), + rabbit_registry:lookup_module(auth_mechanism, T)} of + {true, {ok, Module}} -> + Module; + _ -> + rabbit_misc:protocol_error( + command_invalid, + "invalid authentication mechanism '~s'", [T]) + end + end. + +auth_mechanisms() -> + {ok, Configured} = application:get_env(auth_mechanisms), + [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), + lists:member(Name, Configured)]. + +auth_mechanisms_binary() -> + list_to_binary( + string:join( + [atom_to_list(A) || A <- auth_mechanisms()], " ")). + +auth_phase(Response, + State = #v1{auth_mechanism = AuthMechanism, + auth_state = AuthState, + connection = Connection = + #connection{protocol = Protocol}, + sock = Sock}) -> + case AuthMechanism:handle_response(Response, AuthState) of + {refused, Msg, Args} -> + rabbit_misc:protocol_error( + access_refused, "~s login refused: ~s", + [proplists:get_value(name, AuthMechanism:description()), + io_lib:format(Msg, Args)]); + {protocol_error, Msg, Args} -> + rabbit_misc:protocol_error(syntax_error, Msg, Args); + {challenge, Challenge, AuthState1} -> + Secure = #'connection.secure'{challenge = Challenge}, + ok = send_on_channel0(Sock, Secure, Protocol), + State#v1{auth_state = AuthState1}; + {ok, User} -> + Tune = #'connection.tune'{channel_max = 0, + frame_max = server_frame_max(), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), + State#v1{connection_state = tuning, + connection = Connection#connection{user = User}} + end. + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -849,6 +878,14 @@ i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, #v1{sock = Sock}) -> + ssl_info(fun ({P, _}) -> P end, Sock); +i(ssl_key_exchange, #v1{sock = Sock}) -> + ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); +i(ssl_cipher, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); +i(ssl_hash, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -870,6 +907,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) -> none; i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> Protocol:version(); +i(auth_mechanism, #v1{auth_mechanism = none}) -> + none; +i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> + proplists:get_value(name, Mechanism:description()); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -895,6 +936,13 @@ socket_info(Get, Select) -> {error, _} -> '' end. +ssl_info(F, Sock) -> + case rabbit_net:ssl_info(Sock) of + nossl -> ''; + {error, _} -> ''; + {ok, Info} -> F(Info) + end. + cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; @@ -909,17 +957,31 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, frame_max = FrameMax, - user = #user{username = Username}, + user = User, vhost = VHost}} = State, - {ok, ChSupPid, ChFrPid} = + {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), Username, VHost, Collector}), - erlang:monitor(process, ChSupPid), - put({channel, Channel}, {ch_fr_pid, ChFrPid}), - put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), - put({ch_fr_pid, ChFrPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). + self(), User, VHost, Collector}), + erlang:monitor(process, ChPid), + NewAState = process_channel_frame(AnalyzedFrame, self(), + Channel, ChPid, AState), + put({channel, Channel}, {ChPid, NewAState}), + put({ch_pid, ChPid}, Channel), + State. + +process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> NewAState; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + NewAState; + {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, + Method, Content), + NewAState; + {error, Reason} -> ErrPid ! {channel_exit, Channel, + Reason}, + AState + end. log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl index f15275b5..7a3fcb51 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_registry.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_exchange_type_registry). +-module(rabbit_registry). -behaviour(gen_server). @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, binary_to_type/1, lookup_module/1]). +-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -46,11 +46,12 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). --spec(lookup_module/1 :: - (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_module/2 :: + (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]). -endif. @@ -61,8 +62,8 @@ start_link() -> %%--------------------------------------------------------------------------- -register(TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, TypeName, ModuleName}). +register(Class, TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it @@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) -> TypeAtom -> TypeAtom end. -lookup_module(T) when is_atom(T) -> - case ets:lookup(?ETS_NAME, T) of +lookup_module(Class, T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, {Class, T}) of [{_, Module}] -> {ok, Module}; [] -> {error, not_found} end. +lookup_all(Class) -> + [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})]. + %%--------------------------------------------------------------------------- internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> list_to_atom(binary_to_list(TypeBin)). -internal_register(TypeName, ModuleName) - when is_binary(TypeName), is_atom(ModuleName) -> - ok = sanity_check_module(ModuleName), +internal_register(Class, TypeName, ModuleName) + when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(class_module(Class), ModuleName), true = ets:insert(?ETS_NAME, - {internal_binary_to_type(TypeName), ModuleName}), + {{Class, internal_binary_to_type(TypeName)}, ModuleName}), ok. -sanity_check_module(Module) -> - case catch lists:member(rabbit_exchange_type, +sanity_check_module(ClassModule, Module) -> + case catch lists:member(ClassModule, lists:flatten( [Bs || {Attr, Bs} <- Module:module_info(attributes), Attr =:= behavior orelse Attr =:= behaviour])) of {'EXIT', {undef, _}} -> {error, not_module}; - false -> {error, not_exchange_type}; + false -> {error, {not_type, ClassModule}}; true -> ok end. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism. + %%--------------------------------------------------------------------------- init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), {ok, none}. -handle_call({register, TypeName, ModuleName}, _From, State) -> - ok = internal_register(TypeName, ModuleName), +handle_call({register, Class, TypeName, ModuleName}, _From, State) -> + ok = internal_register(Class, TypeName, ModuleName), {reply, ok, State}; + handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 1d8ce23b..a4da23e2 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -36,6 +36,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). +-export([peer_cert_subject_item/2]). %%-------------------------------------------------------------------------- @@ -45,9 +46,11 @@ -type(certificate() :: binary()). --spec(peer_cert_issuer/1 :: (certificate()) -> string()). --spec(peer_cert_subject/1 :: (certificate()) -> string()). --spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_issuer/1 :: (certificate()) -> string()). +-spec(peer_cert_subject/1 :: (certificate()) -> string()). +-spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_subject_item/2 :: + (certificate(), tuple()) -> string() | 'not_found'). -endif. @@ -71,6 +74,14 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). +%% Return a part of the certificate's subject. +peer_cert_subject_item(Cert, Type) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + subject = Subject }}) -> + find_by_type(Type, Subject) + end, Cert). + %% Return a string describing the certificate's validity. peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { @@ -89,6 +100,14 @@ cert_info(F, Cert) -> DecCert -> DecCert %%R14B onwards end). +find_by_type(Type, {rdnSequence, RDNs}) -> + case [V || #'AttributeTypeAndValue'{type = T, value = V} + <- lists:flatten(RDNs), + T == Type] of + [{printableString, S}] -> S; + [] -> not_found + end. + %%-------------------------------------------------------------------------- %% Formatting functions %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index adf968cb..d913092c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -96,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + %% we now run the tests remotely, so that code coverage on the + %% local node picks up more of the delegate + Node = node(), + Self = self(), + Remote = spawn(SecondaryNode, + fun () -> A = test_delegates_async(Node), + B = test_delegates_sync(Node), + Self ! {self(), {A, B}} + end), + receive + {Remote, Result} -> + Result = {passed, passed} + after 2000 -> + throw(timeout) + end, + passed. test_priority_queue() -> @@ -1014,7 +1030,7 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self(), + user(<<"user">>), <<"/">>, self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- @@ -1074,7 +1090,7 @@ test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - <<"guest">>, <<"/">>, self(), + user(<<"guest">>), <<"/">>, self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok @@ -1082,6 +1098,13 @@ test_spawn(Receiver) -> end, {Writer, Ch}. +user(Username) -> + #user{username = Username, + is_admin = true, + auth_backend = rabbit_auth_backend_internal, + impl = #internal_user{username = Username, + is_admin = true}}. + test_statistics_receiver(Pid) -> receive shutdown -> @@ -1247,15 +1270,26 @@ test_delegates_sync(SecondaryNode) -> true = lists:all(fun ({_, response}) -> true end, GoodRes), GoodResPids = [Pid || {Pid, _} <- GoodRes], - Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), - Good = ordsets:from_list(GoodResPids), + Good = lists:usort(LocalGoodPids ++ RemoteGoodPids), + Good = lists:usort(GoodResPids), {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), BadResPids = [Pid || {Pid, _} <- BadRes], - Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), - Bad = ordsets:from_list(BadResPids), + Bad = lists:usort(LocalBadPids ++ RemoteBadPids), + Bad = lists:usort(BadResPids), + + MagicalPids = [rabbit_misc:string_to_pid(Str) || + Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]], + {[], BadNodes} = delegate:invoke(MagicalPids, Sender), + true = lists:all( + fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end, + BadNodes), + BadNodesPids = [Pid || {Pid, _} <- BadNodes], + + Magical = lists:usort(MagicalPids), + Magical = lists:usort(BadNodesPids), passed. @@ -1662,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), - {A, B} = + {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), @@ -1671,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:write(Guid, Guid, MSCState), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastGuidWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. @@ -1854,7 +1890,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1956,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1966,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2000,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2030,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2047,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2078,7 +2114,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2140,3 +2176,4 @@ test_configurable_server_properties() -> passed. nop(_) -> ok. +nop(_, _) -> ok. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 548014be..70d18d7a 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -42,8 +42,9 @@ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, - connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, - ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection/0, protocol/0, user/0, internal_user/0, + username/0, password/0, password_hash/0, ok/1, error/1, + ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). -type(channel_exit() :: no_return()). @@ -151,9 +152,19 @@ -type(protocol() :: rabbit_framing:protocol()). -type(user() :: - #user{username :: rabbit_access_control:username(), - password_hash :: rabbit_access_control:password_hash(), - is_admin :: boolean()}). + #user{username :: username(), + is_admin :: boolean(), + auth_backend :: atom(), + impl :: any()}). + +-type(internal_user() :: + #internal_user{username :: username(), + password_hash :: password_hash(), + is_admin :: boolean()}). + +-type(username() :: binary()). +-type(password() :: binary()). +-type(password_hash() :: binary()). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1c56d51d..b5ff2b12 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,8 @@ -rabbit_upgrade({remove_user_scope, []}). -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). +-rabbit_upgrade({internal_exchanges, []}). +-rabbit_upgrade({user_to_internal_user, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -35,6 +37,8 @@ -spec(remove_user_scope/0 :: () -> 'ok'). -spec(hash_passwords/0 :: () -> 'ok'). -spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). +-spec(user_to_internal_user/0 :: () -> 'ok'). -endif. @@ -58,7 +62,7 @@ hash_passwords() -> mnesia( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> - Hash = rabbit_access_control:hash_password(Password), + Hash = rabbit_auth_backend_internal:hash_password(Password), {user, Username, Hash, IsAdmin} end, [username, password_hash, is_admin]). @@ -71,8 +75,33 @@ add_ip_to_listener() -> end, [node, protocol, host, ip_address, port]). +internal_exchanges() -> + Tables = [rabbit_exchange, rabbit_durable_exchange], + AddInternalFun = + fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> + {exchange, Name, Type, Durable, AutoDelete, false, Args} + end, + [ ok = mnesia(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) + || T <- Tables ], + ok. + +user_to_internal_user() -> + mnesia( + rabbit_user, + fun({user, Username, PasswordHash, IsAdmin}) -> + {internal_user, Username, PasswordHash, IsAdmin} + end, + [username, password_hash, is_admin], internal_user). + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. + +mnesia(TableName, Fun, FieldList, NewRecordName) -> + {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, + NewRecordName), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0db51165..665cac96 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -323,7 +323,7 @@ timestamp :: timestamp() }). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer (), + count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). -type(sync() :: #sync { acks_persistent :: [[seq_id()]], @@ -412,7 +412,9 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids, ActionTaken) -> + msgs_written_to_disk(Self, Guids, ActionTaken) + end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, + _MsgProps, State = #vqstate { len = 0 }) -> + blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - unconfirmed = Unconfirmed1 }))}. + unconfirmed = UC1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - {Guids, State1} = - ack(fun msg_store_remove/3, - fun ({_IsPersistent, Guid, _MsgProps}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1); - (#msg_status{msg = #basic_message{guid = Guid}}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1) - end, - AckTags, State), - {Guids, a(State1)}. + a(ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - {_Guids, State1} = + a(reduce_memory_use( ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State), - a(reduce_memory_use(State1)). + AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -812,17 +809,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; -needs_idle_timeout(_State) -> - true. +needs_idle_timeout(State = #vqstate { on_sync = OnSync }) -> + case {OnSync, needs_index_sync(State)} of + {?BLANK_SYNC, false} -> + {Res, _State} = reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State), + Res; + _ -> + true + end. -idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). +idle_timeout(State) -> + a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1096,9 +1098,9 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) + Self, fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, NewState}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - unconfirmed = Unconfirmed1 }}. + unconfirmed = UC1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1311,10 +1312,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> - {{IsPersistent, Guid, MsgProps}, RAI}; - false -> - {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + true -> {{IsPersistent, Guid, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), State #vqstate { pending_ack = PA1, @@ -1325,8 +1324,8 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, - {[], orddict:new()}, PA), + {PersistentSeqIds, GuidsByStore} = + dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of @@ -1336,18 +1335,17 @@ remove_pending_ack(KeepPersistent, Guids), State1 end; - false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold( - fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + false -> IndexState1 = + rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, + {{PersistentSeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1361,27 +1359,27 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} - end, {{[], orddict:new()}, State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - AckdGuids = lists:concat( - orddict:fold( - fun (IsPersistent, Guids, Gs) -> - MsgStoreFun(MSCState, IsPersistent, Guids), - [Guids | Gs] - end, [], GuidsByStore)), + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = MsgStoreFun(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {AckdGuids, State1 #vqstate { index_state = IndexState1, + State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + ack_out_counter = AckOutCount + length(AckTags) }. + +accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, Acc) -> - Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> - {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. + index_on_disk = false }, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}; +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1393,6 +1391,13 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- +confirm_commit_index(State = #vqstate { index_state = IndexState }) -> + case needs_index_sync(State) of + true -> State #vqstate { + index_state = rabbit_queue_index:sync(IndexState) }; + false -> State + end. + remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1400,14 +1405,35 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), unconfirmed = gb_sets:difference(UC, GuidSet) }. +needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + %% If UC is empty then by definition, MIOD and MOD are also empty + %% and there's nothing that can be pending a sync. + + %% If UC is not empty, then we want to find is_empty(UC - MIOD), + %% but the subtraction can be expensive. Thus instead, we test to + %% see if UC is a subset of MIOD. This can only be the case if + %% MIOD == UC, which would indicate that every message in UC is + %% also in MIOD and is thus _all_ pending on a msg_store sync, not + %% on a qi sync. Thus the negation of this is sufficient. Because + %% is_subset is short circuiting, this is more efficient than the + %% subtraction. + not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). + msgs_confirmed(GuidSet, State) -> - {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}. + {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. + +blind_confirm(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). -msgs_written_to_disk(QPid, GuidSet) -> +msgs_written_to_disk(QPid, GuidSet, removed) -> + blind_confirm(QPid, GuidSet); +msgs_written_to_disk(QPid, GuidSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), State #vqstate { msgs_on_disk = @@ -1417,9 +1443,9 @@ msgs_written_to_disk(QPid, GuidSet) -> msg_indices_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> msgs_confirmed(gb_sets:intersection(GuidSet, MOD), State #vqstate { msg_indices_on_disk = diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl new file mode 100644 index 00000000..f939a3fe --- /dev/null +++ b/src/rabbit_vhost.erl @@ -0,0 +1,122 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_vhost). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-export([add/1, delete/1, exists/1, list/0, with/2]). + +-ifdef(use_specs). + +-spec(add/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(exists/1 :: (rabbit_types:vhost()) -> boolean()). +-spec(list/0 :: () -> [rabbit_types:vhost()]). +-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +add(VHostPath) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_vhost, VHostPath}) of + [] -> + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], + ok; + [_] -> + mnesia:abort({vhost_already_exists, VHostPath}) + end + end), + rabbit_log:info("Added vhost ~p~n", [VHostPath]), + R. + +delete(VHostPath) -> + %%FIXME: We are forced to delete the queues outside the TX below + %%because queue deletion involves sending messages to the queue + %%process, which in turn results in further mnesia actions and + %%eventually the termination of that process. + lists:foreach(fun (Q) -> + {ok,_} = rabbit_amqqueue:delete(Q, false, false) + end, + rabbit_amqqueue:list(VHostPath)), + R = rabbit_misc:execute_mnesia_transaction( + with(VHostPath, fun () -> + ok = internal_delete(VHostPath) + end)), + rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), + R. + +internal_delete(VHostPath) -> + lists:foreach(fun (#exchange{name = Name}) -> + ok = rabbit_exchange:delete(Name, false) + end, + rabbit_exchange:list(VHostPath)), + lists:foreach( + fun ({Username, _, _, _}) -> + ok = rabbit_auth_backend_internal:clear_permissions(Username, + VHostPath) + end, + rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)), + ok = mnesia:delete({rabbit_vhost, VHostPath}), + ok. + +exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + +list() -> + mnesia:dirty_all_keys(rabbit_vhost). + +with(VHostPath, Thunk) -> + fun () -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [_V] -> + Thunk() + end + end. |