summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml63
-rw-r--r--ebin/rabbit_app.in5
-rw-r--r--include/rabbit.hrl10
-rw-r--r--include/rabbit_auth_backend_spec.hrl46
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl41
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-multi3
-rw-r--r--scripts/rabbitmq-multi.bat2
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--scripts/rabbitmqctl.bat2
-rw-r--r--src/delegate.erl209
-rw-r--r--src/delegate_sup.erl13
-rw-r--r--src/rabbit.erl20
-rw-r--r--src/rabbit_access_control.erl466
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl215
-rw-r--r--src/rabbit_auth_backend.erl76
-rw-r--r--src/rabbit_auth_backend_internal.erl347
-rw-r--r--src/rabbit_auth_mechanism.erl57
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl70
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl74
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_auth_mechanism_plain.erl66
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_binary_generator.erl3
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl296
-rw-r--r--src/rabbit_channel_sup.erl17
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl148
-rw-r--r--src/rabbit_connection_sup.erl1
-rw-r--r--src/rabbit_control.erl35
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl35
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_framing_channel.erl129
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_mnesia.erl32
-rw-r--r--src/rabbit_msg_store.erl280
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_prelaunch.erl21
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_reader.erl292
-rw-r--r--src/rabbit_registry.erl (renamed from src/rabbit_exchange_type_registry.erl)44
-rw-r--r--src/rabbit_ssl.erl25
-rw-r--r--src/rabbit_tests.erl65
-rw-r--r--src/rabbit_types.erl21
-rw-r--r--src/rabbit_upgrade_functions.erl31
-rw-r--r--src/rabbit_variable_queue.erl170
-rw-r--r--src/rabbit_vhost.erl122
58 files changed, 2439 insertions, 1360 deletions
diff --git a/Makefile b/Makefile
index e0d5744c..00bfd629 100644
--- a/Makefile
+++ b/Makefile
@@ -170,7 +170,7 @@ start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
- ./scripts/rabbitmq-server ; sleep 1
+ ./scripts/rabbitmq-server; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6b02abe4..2152cab3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -398,7 +398,12 @@
<refsect2>
<title>User management</title>
-
+ <para>
+ Note that <command>rabbitmqctl</command> manages the RabbitMQ
+ internal user database. Users from any alternative
+ authentication backend will not be visible
+ to <command>rabbitmqctl</command>.
+ </para>
<variablelist>
<varlistentry>
<term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term>
@@ -466,6 +471,25 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose password is to be cleared.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_password tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to clear the
+ password for the user named
+ <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
@@ -525,7 +549,12 @@
<refsect2>
<title>Access control</title>
-
+ <para>
+ Note that <command>rabbitmqctl</command> manages the RabbitMQ
+ internal user database. Permissions for users from any
+ alternative authorisation backend will not be visible
+ to <command>rabbitmqctl</command>.
+ </para>
<variablelist>
<varlistentry>
<term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
@@ -847,6 +876,10 @@
<listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>internal</term>
+ <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>arguments</term>
<listitem><para>Exchange arguments.</para></listitem>
</varlistentry>
@@ -977,6 +1010,26 @@
connection is secured with SSL.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>ssl_protocol</term>
+ <listitem><para>SSL protocol
+ (e.g. tlsv1)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_key_exchange</term>
+ <listitem><para>SSL key exchange algorithm
+ (e.g. rsa)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_cipher</term>
+ <listitem><para>SSL cipher algorithm
+ (e.g. aes_256_cbc)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_hash</term>
+ <listitem><para>SSL hash function
+ (e.g. sha)</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>peer_cert_subject</term>
<listitem><para>The subject of the peer's SSL
certificate, in RFC4514 form.</para></listitem>
@@ -1005,6 +1058,10 @@
<listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>auth_mechanism</term>
+ <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
@@ -1054,7 +1111,7 @@
<para role="example-prefix">
For example:
</para>
- <screen role="example">rabbitmqctl list_connections send_pend server_port</screen>
+ <screen role="example">rabbitmqctl list_connections send_pend port</screen>
<para role="example">
This command displays the send queue size and server port for each
connection.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index d3808a54..1f25338d 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -31,4 +31,7 @@
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{cluster_nodes, []},
{server_properties, []},
- {collect_statistics, none}]}]}.
+ {collect_statistics, none},
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
+ {auth_backends, [rabbit_auth_backend_internal]},
+ {delegate_count, 16}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index fccfad97..81c3996b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -29,7 +29,13 @@
%% Contributor(s): ______________________________________.
%%
--record(user, {username, password_hash, is_admin}).
+-record(user, {username,
+ is_admin,
+ auth_backend, %% Module this user came from
+ impl %% Scratch space for that module
+ }).
+
+-record(internal_user, {username, password_hash, is_admin}).
-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
@@ -51,7 +57,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, auto_delete, internal, arguments}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid}).
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
new file mode 100644
index 00000000..a96c18d8
--- /dev/null
+++ b/include/rabbit_auth_backend_spec.hrl
@@ -0,0 +1,46 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+
+-spec(check_user_login/2 :: (rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}).
+-spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_access_control:vhost_permission_atom()) ->
+ boolean() | {'error', any()}).
+-spec(check_resource_access/3 :: (rabbit_types:user(),
+ rabbit_types:r(atom()),
+ rabbit_access_control:permission_atom()) ->
+ boolean() | {'error', any()}).
+-endif.
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
new file mode 100644
index 00000000..f8dc93fe
--- /dev/null
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(init/1 :: (rabbit_net:socket()) -> any()).
+-spec(handle_response/2 :: (binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', string(), [any()]}).
+
+-endif.
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f67c6f46..6fa34ccc 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -58,7 +58,7 @@
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 36734874..8cb470d0 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -48,6 +48,8 @@ done
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
+NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 59050692..33883702 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,8 +29,7 @@
##
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
+
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4f8c8b4..ec61dc99 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
-pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 66ce4384..4155b31d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,8 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
@@ -92,6 +90,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
-noinput \
-hidden \
-s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
-extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 872c87e3..ec5b4d85 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,6 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 76ce25fd..56cff891 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,9 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
-
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 563b9e58..4ffde73f 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/delegate.erl b/src/delegate.erl
index 11abe73b..10054e57 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -31,11 +31,9 @@
-module(delegate).
--define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-
-behaviour(gen_server2).
--export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,13 +42,16 @@
-ifdef(use_specs).
--spec(start_link/2 ::
- (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
+-spec(invoke/2 ::
+ ( pid(), fun ((pid()) -> A)) -> A;
+ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
--spec(process_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/0 :: () -> non_neg_integer()).
-endif.
@@ -61,157 +62,113 @@
%%----------------------------------------------------------------------------
-start_link(Prefix, Hash) ->
- gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
+start_link(Num) ->
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ Fun(Pid);
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
- case Res of
- {ok, Result, _} ->
+ case invoke([Pid], Fun) of
+ {[{Pid, Result}], []} ->
Result;
- {error, {Class, Reason, StackTrace}, _} ->
+ {[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
invoke(Pids, Fun) when is_list(Pids) ->
- lists:foldl(
- fun ({Status, Result, Pid}, {Good, Bad}) ->
- case Status of
- ok -> {[{Pid, Result}|Good], Bad};
- error -> {Good, [{Pid, Result}|Bad]}
- end
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ %% The use of multi_call is only safe because the timeout is
+ %% infinity, and thus there is no process spawned in order to do
+ %% the sending. Thus calls can't overtake preceding calls/casts.
+ {Replies, BadNodes} =
+ case orddict:fetch_keys(Grouped) of
+ [] -> {[], []};
+ RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped},
+ infinity)
end,
- {[], []},
- invoke_per_node(split_delegate_per_node(Pids), Fun)).
+ BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
+ BadNode <- BadNodes,
+ Pid <- orddict:fetch(BadNode, Grouped)],
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ [Results || {_Node, Results} <- Replies]]),
+ lists:foldl(
+ fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
+ ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
+ end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, Fun), %% we don't care about any error
ok;
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result([Pid], Fun);
invoke_no_result(Pids, Fun) when is_list(Pids) ->
- invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ case orddict:fetch_keys(Grouped) of
+ [] -> ok;
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped})
+ end,
+ safe_invoke(LocalPids, Fun), %% must not die
ok.
%%----------------------------------------------------------------------------
-internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
-
-internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
-
-split_delegate_per_node(Pids) ->
+group_pids_by_node(Pids) ->
LocalNode = node(),
- {Local, Remote} =
- lists:foldl(
- fun (Pid, {L, D}) ->
- Node = node(Pid),
- case Node of
- LocalNode -> {[Pid|L], D};
- _ -> {L, orddict:append(Node, Pid, D)}
- end
- end,
- {[], orddict:new()}, Pids),
- {Local, orddict:to_list(Remote)}.
-
-invoke_per_node(NodePids, Fun) ->
- lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-
-invoke_no_result_per_node(NodePids, Fun) ->
- delegate_per_node(NodePids, Fun, fun internal_cast/2),
- ok.
-
-delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
- %% In the case where DelegateFun is internal_cast, the safe_invoke
- %% is not actually async! However, in practice Fun will always be
- %% something that does a gen_server:cast or similar, so I don't
- %% think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- [safe_invoke(LocalPids, Fun)|
- delegate_per_remote_node(NodePids, Fun, DelegateFun)].
-
-delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
- Self = self(),
- %% Note that this is unsafe if the Fun requires reentrancy to the
- %% local_server. I.e. if self() == local_server(Node) then we'll
- %% block forever.
- [gen_server2:cast(
- local_server(Node),
- {thunk, fun () ->
- Self ! {result,
- DelegateFun(
- Node, fun () -> safe_invoke(Pids, Fun) end)}
- end}) || {Node, Pids} <- NodePids],
- [receive {result, Result} -> Result end || _ <- NodePids].
-
-local_server(Node) ->
- case get({delegate_local_server_name, Node}) of
- undefined ->
- Name = server(outgoing,
- erlang:phash2({self(), Node}, process_count())),
- put({delegate_local_server_name, Node}, Name),
- Name;
- Name -> Name
- end.
-
-remote_server(Node) ->
- case get({delegate_remote_server_name, Node}) of
- undefined ->
- case rpc:call(Node, delegate, process_count, []) of
- {badrpc, _} ->
- %% Have to return something, if we're just casting
- %% then we don't want to blow up
- server(incoming, 1);
- Count ->
- Name = server(incoming,
- erlang:phash2({self(), Node}, Count)),
- put({delegate_remote_server_name, Node}, Name),
- Name
- end;
- Name -> Name
+ lists:foldl(
+ fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
+ {[Pid | Local], Remote};
+ (Pid, {Local, Remote}) ->
+ {Local,
+ orddict:update(
+ node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
+ end, {[], orddict:new()}, Pids).
+
+delegate_count() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ Count.
+
+delegate_name(Hash) ->
+ list_to_atom("delegate_" ++ integer_to_list(Hash)).
+
+delegate() ->
+ case get(delegate) of
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(), delegate_count())),
+ put(delegate, Name),
+ Name;
+ Name -> Name
end.
-server(Prefix, Hash) ->
- list_to_atom("delegate_" ++
- atom_to_list(Prefix) ++ "_" ++
- integer_to_list(Hash)).
-
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
safe_invoke(Pid, Fun) when is_pid(Pid) ->
try
- {ok, Fun(Pid), Pid}
- catch
- Class:Reason ->
- {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ {ok, Pid, Fun(Pid)}
+ catch Class:Reason ->
+ {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
-process_count() ->
- ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
-
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, no_state, hibernate,
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-%% We don't need a catch here; we always go via safe_invoke. A catch here would
-%% be the wrong thing anyway since the Thunk can throw multiple errors.
-handle_call({thunk, Thunk}, _From, State) ->
- {reply, Thunk(), State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({thunk, Thunk}, State) ->
- Thunk(),
- {noreply, State, hibernate}.
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 544546f1..d2af72af 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -55,11 +55,8 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}.
-
-specs(Prefix) ->
- [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Hash <- lists:seq(0, delegate:process_count() - 1)].
-
-%%----------------------------------------------------------------------------
+ DCount = delegate:delegate_count(),
+ {ok, {{one_for_one, 10, 10},
+ [{Num, {delegate, start_link, [Num]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Num <- lists:seq(0, DCount - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ace8f286..954e289b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -69,10 +69,10 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_exchange_type_registry,
- [{description, "exchange type registry"},
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
{mfa, {rabbit_sup, start_child,
- [rabbit_exchange_type_registry]}},
+ [rabbit_registry]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
@@ -458,16 +458,16 @@ insert_default_data() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
- ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_vhost:add(DefaultVHost),
+ ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
case DefaultAdmin of
- true -> rabbit_access_control:set_admin(DefaultUser);
+ true -> rabbit_auth_backend_internal:set_admin(DefaultUser);
_ -> ok
end,
- ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
ok.
rotate_logs(File, Suffix, Handler) ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index bc588013..02a65442 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -30,419 +30,123 @@
%%
-module(rabbit_access_control).
--include_lib("stdlib/include/qlc.hrl").
+
-include("rabbit.hrl").
--export([check_login/2, user_pass_login/2, check_user_pass_login/2,
- check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, set_admin/1,
- clear_admin/1, list_users/0, lookup_user/1]).
--export([change_password_hash/2, hash_password/1]).
--export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
- list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
- list_user_vhost_permissions/2]).
+-export([user_pass_login/2, check_user_pass_login/2, check_user_login/2,
+ check_vhost_access/2, check_resource_access/3, list_vhosts/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([username/0, password/0, password_hash/0]).
+-export_type([permission_atom/0, vhost_permission_atom/0]).
-type(permission_atom() :: 'configure' | 'read' | 'write').
--type(username() :: binary()).
--type(password() :: binary()).
--type(password_hash() :: binary()).
--type(regexp() :: binary()).
--spec(check_login/2 ::
- (binary(), binary()) -> rabbit_types:user() |
- rabbit_types:channel_exit()).
+-type(vhost_permission_atom() :: 'read' | 'write').
+
-spec(user_pass_login/2 ::
- (username(), password())
+ (rabbit_types:username(), rabbit_types:password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
- (username(), password())
- -> {'ok', rabbit_types:user()} | 'refused').
+ (rabbit_types:username(), rabbit_types:password())
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
- (username(), rabbit_types:r(atom()), permission_atom())
+ (rabbit_types:user(), rabbit_types:r(atom()), permission_atom())
-> 'ok' | rabbit_types:channel_exit()).
--spec(add_user/2 :: (username(), password()) -> 'ok').
--spec(delete_user/1 :: (username()) -> 'ok').
--spec(change_password/2 :: (username(), password()) -> 'ok').
--spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
--spec(hash_password/1 :: (password()) -> password_hash()).
--spec(set_admin/1 :: (username()) -> 'ok').
--spec(clear_admin/1 :: (username()) -> 'ok').
--spec(list_users/0 :: () -> [{username(), boolean()}]).
--spec(lookup_user/1 ::
- (username()) -> rabbit_types:ok(rabbit_types:user())
- | rabbit_types:error('not_found')).
--spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
--spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
- regexp(), regexp()) -> 'ok').
--spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
--spec(list_permissions/0 ::
- () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
--spec(list_user_permissions/1 ::
- (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_user_vhost_permissions/2 ::
- (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]).
+-spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom())
+ -> [rabbit_types:vhost()]).
-endif.
%%----------------------------------------------------------------------------
-%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
-%% apparently, by OpenAMQ.
-check_login(<<"PLAIN">>, Response) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- user_pass_login(User, Pass);
-%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
-%% defines this as PLAIN, but in 0-9 that definition is gone, instead
-%% referring generically to "SASL security mechanism", i.e. the above.
-check_login(<<"AMQPLAIN">>, Response) ->
- LoginTable = rabbit_binary_parser:parse_table(Response),
- case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
- lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
- user_pass_login(User, Pass);
- _ ->
- %% Is this an information leak?
- rabbit_misc:protocol_error(
- access_refused,
- "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable])
- end;
-
-check_login(Mechanism, _Response) ->
- rabbit_misc:protocol_error(
- access_refused, "unsupported authentication mechanism '~s'",
- [Mechanism]).
-
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- refused ->
+ {refused, Msg, Args} ->
rabbit_misc:protocol_error(
- access_refused, "login refused for user '~s'", [User]);
+ access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]);
{ok, U} ->
U
end.
-check_user_pass_login(User, Pass) ->
- case lookup_user(User) of
- {ok, U} ->
- case check_password(Pass, U#user.password_hash) of
- true -> {ok, U};
- _ -> refused
- end;
- {error, not_found} ->
- refused
- end.
-
-internal_lookup_vhost_access(Username, VHostPath) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] -> not_found;
- [R] -> {ok, R}
- end
- end).
-
-check_vhost_access(#user{username = Username}, VHostPath) ->
+check_user_pass_login(Username, Password) ->
+ check_user_login(Username, [{password, Password}]).
+
+check_user_login(Username, AuthProps) ->
+ {ok, Modules} = application:get_env(rabbit, auth_backends),
+ lists:foldl(
+ fun(Module, {refused, _, _}) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} ->
+ {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else ->
+ Else
+ end;
+ (_, {ok, User}) ->
+ {ok, User}
+ end, {refused, "No modules checked '~s'", [Username]}, Modules).
+
+check_vhost_access(User = #user{ username = Username,
+ auth_backend = Module }, VHostPath) ->
?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
- case internal_lookup_vhost_access(Username, VHostPath) of
- {ok, _R} ->
- ok;
- not_found ->
- rabbit_misc:protocol_error(
- access_refused, "access to vhost '~s' refused for user '~s'",
- [VHostPath, Username])
- end.
-
-permission_index(configure) -> #permission.configure;
-permission_index(write) -> #permission.write;
-permission_index(read) -> #permission.read.
-
-check_resource_access(Username,
- R = #resource{kind = exchange, name = <<"">>},
+ check_access(
+ fun() ->
+ rabbit_vhost:exists(VHostPath) andalso
+ Module:check_vhost_access(User, VHostPath, write)
+ end,
+ "~s failed checking vhost access to ~s for ~s: ~p~n",
+ [Module, VHostPath, Username],
+ "access to vhost '~s' refused for user '~s'",
+ [VHostPath, Username]).
+
+check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
- check_resource_access(Username,
- R#resource{name = <<"amq.default">>},
+ check_resource_access(User, R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(Username,
- R = #resource{virtual_host = VHostPath, name = Name},
- Permission) ->
- Res = case mnesia:dirty_read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] ->
- false;
- [#user_permission{permission = P}] ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
- end
- end,
- if Res -> ok;
- true -> rabbit_misc:protocol_error(
- access_refused, "access to ~s refused for user '~s'",
- [rabbit_misc:rs(R), Username])
- end.
-
-add_user(Username, Password) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(rabbit_user,
- #user{username = Username,
- password_hash =
- hash_password(Password),
- is_admin = false},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end),
- rabbit_log:info("Created user ~p~n", [Username]),
- R.
-
-delete_user(Username) ->
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)),
- rabbit_log:info("Deleted user ~p~n", [Username]),
- R.
-
-change_password(Username, Password) ->
- change_password_hash(Username, hash_password(Password)).
-
-change_password_hash(Username, PasswordHash) ->
- R = update_user(Username, fun(User) ->
- User#user{ password_hash = PasswordHash }
- end),
- rabbit_log:info("Changed password for user ~p~n", [Username]),
- R.
-
-hash_password(Cleartext) ->
- Salt = make_salt(),
- Hash = salted_md5(Salt, Cleartext),
- <<Salt/binary, Hash/binary>>.
-
-check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext).
-
-make_salt() ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- Salt = random:uniform(16#ffffffff),
- <<Salt:32>>.
-
-salted_md5(Salt, Cleartext) ->
- Salted = <<Salt/binary, Cleartext/binary>>,
- erlang:md5(Salted).
-
-set_admin(Username) ->
- set_admin(Username, true).
-
-clear_admin(Username) ->
- set_admin(Username, false).
-
-set_admin(Username, IsAdmin) ->
- R = update_user(Username, fun(User) ->
- User#user{is_admin = IsAdmin}
- end),
- rabbit_log:info("Set user admin flag for user ~p to ~p~n",
- [Username, IsAdmin]),
- R.
-
-update_user(Username, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- {ok, User} = lookup_user(Username),
- ok = mnesia:write(rabbit_user, Fun(User), write)
- end)).
-
-list_users() ->
- [{Username, IsAdmin} ||
- #user{username = Username, is_admin = IsAdmin} <-
- mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})].
-
-lookup_user(Username) ->
- rabbit_misc:dirty_read({rabbit_user, Username}).
-
-add_vhost(VHostPath) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
- end
- end),
- rabbit_log:info("Added vhost ~p~n", [VHostPath]),
- R.
-
-delete_vhost(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () ->
- ok = internal_delete_vhost(VHostPath)
- end)),
- rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
- R.
-
-internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
- ok = clear_permissions(Username, VHostPath)
- end,
- list_vhost_permissions(VHostPath)),
- ok = mnesia:delete({rabbit_vhost, VHostPath}),
- ok.
-
-vhost_exists(VHostPath) ->
- mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
-
-list_vhosts() ->
- mnesia:dirty_all_keys(rabbit_vhost).
-
-validate_regexp(RegexpBin) ->
- Regexp = binary_to_list(RegexpBin),
- case re:compile(Regexp) of
- {ok, _} -> ok;
- {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+check_resource_access(User = #user{username = Username, auth_backend = Module},
+ Resource, Permission) ->
+ check_access(
+ fun() -> Module:check_resource_access(User, Resource, Permission) end,
+ "~s failed checking resource access to ~p for ~s: ~p~n",
+ [Module, Resource, Username],
+ "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(Resource), Username]).
+
+check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
+ Allow = case Fun() of
+ {error, _} = E ->
+ rabbit_log:error(ErrStr, ErrArgs ++ [E]),
+ false;
+ Else ->
+ Else
+ end,
+ case Allow of
+ true ->
+ ok;
+ false ->
+ rabbit_misc:protocol_error(access_refused, RefStr, RefArgs)
end.
-set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () -> ok = mnesia:write(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = #permission{
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
- write)
- end)).
-
-
-clear_permissions(Username, VHostPath) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () ->
- ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}})
- end)).
-
-list_permissions() ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(match_user_vhost('_', '_'))].
-
-list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_vhost(
- VHostPath, match_user_vhost('_', VHostPath)))].
-
-list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user(
- Username, match_user_vhost(Username, '_')))].
-
-list_user_vhost_permissions(Username, VHostPath) ->
- [{ConfigurePerm, WritePerm, ReadPerm} ||
- {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- match_user_vhost(Username, VHostPath)))].
-
-list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- #user_permission{user_vhost = #user_vhost{username = Username,
- virtual_host = VHostPath},
- permission = #permission{ configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
-
-match_user_vhost(Username, VHostPath) ->
- fun () -> mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = '_'},
- read)
- end.
+%% Permission = write -> log in
+%% Permission = read -> learn of the existence of (only relevant for
+%% management plugin)
+list_vhosts(User = #user{username = Username, auth_backend = Module},
+ Permission) ->
+ lists:filter(
+ fun(VHost) ->
+ case Module:check_vhost_access(User, VHost, Permission) of
+ {error, _} = E ->
+ rabbit_log:warning("~w failed checking vhost access "
+ "to ~s for ~s: ~p~n",
+ [Module, VHost, Username, E]),
+ false;
+ Else ->
+ Else
+ end
+ end, rabbit_vhost:list()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 775c631d..35ed1c94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -152,9 +152,9 @@
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -503,19 +503,17 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid}.
safe_delegate_call_ok(F, Pids) ->
- {_, Bad} = delegate:invoke(Pids,
- fun (Pid) ->
+ case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () -> F(Pid) end)
- end),
- case Bad of
- [] -> ok;
- _ -> {error, Bad}
+ end) of
+ {_, []} -> ok;
+ {_, Bad} -> {error, Bad}
end.
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78bb6835..38b83117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -199,6 +199,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ [emit_consumer_deleted(Ch, CTag)
+ || {Ch, CTag, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -226,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -372,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -394,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -425,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {[], GTC}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
+ {ChPid, MsgSeqNos} <- group_confirms_by_channel(
+ CMs1, [{Ch, [MsgSeqNo]}])];
+ [] ->
+ ok
end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+ State#q{guid_to_channel = GTC1}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- State;
+ {no_confirm, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
@@ -449,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
- State.
-
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
+ {no_confirm, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -471,12 +481,12 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ, q = Q}) ->
- NeedsConfirming = Message#basic_message.is_persistent andalso
- Q#amqqueue.durable,
- case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+ {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
+ case {NeedsConfirming, MsgSeqNo} of
+ {_, undefined} -> ok;
+ {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ {confirm, _} -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -488,31 +498,37 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = NeedsConfirming},
+ needs_confirming = (NeedsConfirming =:= confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, NeedsConfirming, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ {NeedsConfirming,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
record_current_channel_tx(ChPid, Txn),
{true,
+ NeedsConfirming,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, State1} ->
+ {true, _, State1} ->
{true, State1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}} ->
+ #delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming = (MsgSeqNo =/= undefined)},
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
@@ -520,7 +536,7 @@ deliver_or_enqueue(Delivery, State) ->
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
- BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
+ {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -536,12 +552,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(Ch, CTag) ||
+ {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -617,12 +640,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {BQS2, State1} =
- case Fun(BQS) of
- {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
- BQS1 -> {BQS1, State}
- end,
- run_message_queue(State1#q{backing_queue_state = BQS2}).
+ {Guids, BQS1} = Fun(BQS),
+ run_message_queue(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -724,12 +744,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, self()}]).
+
%---------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
@@ -743,18 +785,19 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -792,16 +835,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -816,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
-handle_call({deliver, Delivery}, _From, State) ->
- %% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- reply(Delivered, NewState);
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode. Reply asap.
+ gen_server2:reply(From, true),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -859,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -902,6 +937,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -920,6 +957,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -994,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1004,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1013,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
@@ -1107,7 +1148,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
new file mode 100644
index 00000000..0dc8e61b
--- /dev/null
+++ b/src/rabbit_auth_backend.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description proplist as with auth mechanisms,
+ %% exchanges. Currently unused.
+ {description, 0},
+
+ %% Check a user can log in, given a username and a proplist of
+ %% authentication information (e.g. [{password, Password}]).
+ %%
+ %% Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {check_user_login, 2},
+
+ %% Given #user, vhost path and permission, can a user access a vhost?
+ %% Permission is read - learn of the existence of (only relevant for
+ %% management plugin)
+ %% or write - log in
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_vhost_access, 3},
+
+ %% Given #user, resource and permission, can a user access a resource?
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_resource_access, 3}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
new file mode 100644
index 00000000..233e2b90
--- /dev/null
+++ b/src/rabbit_auth_backend_internal.erl
@@ -0,0 +1,347 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend_internal).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_backend).
+
+-export([description/0]).
+-export([check_user_login/2, check_vhost_access/3, check_resource_access/3]).
+
+-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
+ clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
+-export([make_salt/0, check_password/2, change_password_hash/2,
+ hash_password/1]).
+-export([set_permissions/5, clear_permissions/2,
+ list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
+ list_user_vhost_permissions/2]).
+
+-include("rabbit_auth_backend_spec.hrl").
+
+-ifdef(use_specs).
+
+-type(regexp() :: binary()).
+
+-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok').
+-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok').
+-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password())
+ -> 'ok').
+-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok').
+-spec(make_salt/0 :: () -> binary()).
+-spec(check_password/2 :: (rabbit_types:password(),
+ rabbit_types:password_hash()) -> boolean()).
+-spec(change_password_hash/2 :: (rabbit_types:username(),
+ rabbit_types:password_hash()) -> 'ok').
+-spec(hash_password/1 :: (rabbit_types:password())
+ -> rabbit_types:password_hash()).
+-spec(set_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
+-spec(lookup_user/1 :: (rabbit_types:username())
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
+-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
+ -> 'ok').
+-spec(list_permissions/0 ::
+ () -> [{rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_vhost_permissions/1 ::
+ (rabbit_types:vhost()) -> [{rabbit_types:username(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (rabbit_types:username()) -> [{rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_vhost_permissions/2 ::
+ (rabbit_types:username(), rabbit_types:vhost())
+ -> [{regexp(), regexp(), regexp()}]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Implementation of rabbit_auth_backend
+
+description() ->
+ [{name, <<"Internal">>},
+ {description, <<"Internal user / password database">>}].
+
+check_user_login(Username, []) ->
+ internal_check_user_login(Username, fun(_) -> true end);
+check_user_login(Username, [{password, Password}]) ->
+ internal_check_user_login(
+ Username,
+ fun(#internal_user{password_hash = Hash}) ->
+ check_password(Password, Hash)
+ end);
+check_user_login(Username, AuthProps) ->
+ exit({unknown_auth_props, Username, AuthProps}).
+
+internal_check_user_login(Username, Fun) ->
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
+ case lookup_user(Username) of
+ {ok, User = #internal_user{is_admin = IsAdmin}} ->
+ case Fun(User) of
+ true -> {ok, #user{username = Username,
+ is_admin = IsAdmin,
+ auth_backend = ?MODULE,
+ impl = User}};
+ _ -> Refused
+ end;
+ {error, not_found} ->
+ Refused
+ end.
+
+check_vhost_access(#user{is_admin = true}, _VHostPath, read) ->
+ true;
+
+check_vhost_access(#user{username = Username}, VHostPath, _) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end
+ end).
+
+check_resource_access(#user{username = Username},
+ #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
+ end.
+
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
+%%----------------------------------------------------------------------------
+%% Manipulation of the user database
+
+add_user(Username, Password) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ is_admin = false},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end),
+ rabbit_log:info("Created user ~p~n", [Username]),
+ R.
+
+delete_user(Username) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)),
+ rabbit_log:info("Deleted user ~p~n", [Username]),
+ R.
+
+change_password(Username, Password) ->
+ change_password_hash(Username, hash_password(Password)).
+
+clear_password(Username) ->
+ change_password_hash(Username, <<"">>).
+
+change_password_hash(Username, PasswordHash) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash }
+ end),
+ rabbit_log:info("Changed password for user ~p~n", [Username]),
+ R.
+
+hash_password(Cleartext) ->
+ Salt = make_salt(),
+ Hash = salted_md5(Salt, Cleartext),
+ <<Salt/binary, Hash/binary>>.
+
+check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
+ Hash =:= salted_md5(Salt, Cleartext).
+
+make_salt() ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ Salt = random:uniform(16#ffffffff),
+ <<Salt:32>>.
+
+salted_md5(Salt, Cleartext) ->
+ Salted = <<Salt/binary, Cleartext/binary>>,
+ erlang:md5(Salted).
+
+set_admin(Username) ->
+ set_admin(Username, true).
+
+clear_admin(Username) ->
+ set_admin(Username, false).
+
+set_admin(Username, IsAdmin) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{is_admin = IsAdmin}
+ end),
+ rabbit_log:info("Set user admin flag for user ~p to ~p~n",
+ [Username, IsAdmin]),
+ R.
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+list_users() ->
+ [{Username, IsAdmin} ||
+ #internal_user{username = Username, is_admin = IsAdmin} <-
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
+
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
+
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
+ end)).
+
+
+clear_permissions(Username, VHostPath) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () ->
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
+ end)).
+
+list_permissions() ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(match_user_vhost('_', '_'))].
+
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_vhost:with(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ [{ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ match_user_vhost(Username, VHostPath)))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
new file mode 100644
index 00000000..ce1b16ac
--- /dev/null
+++ b/src/rabbit_auth_mechanism.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description.
+ {description, 0},
+
+ %% Called before authentication starts. Should create a state
+ %% object to be passed through all the stages of authentication.
+ {init, 1},
+
+ %% Handle a stage of authentication. Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {challenge, Challenge, NextState}
+ %% Another round is needed. Here's the state I want next time.
+ %% {protocol_error, Msg, Args}
+ %% Client got the protocol wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {handle_response, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
new file mode 100644
index 00000000..5d51d904
--- /dev/null
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_amqplain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism amqplain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
+%% defines this as PLAIN, but in 0-9 that definition is gone, instead
+%% referring generically to "SASL security mechanism", i.e. the above.
+
+description() ->
+ [{name, <<"AMQPLAIN">>},
+ {description, <<"QPid AMQPLAIN mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ LoginTable = rabbit_binary_parser:parse_table(Response),
+ case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
+ lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
+ {{value, {_, longstr, User}},
+ {value, {_, longstr, Pass}}} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
+ [LoginTable]}
+ end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
new file mode 100644
index 00000000..67665928
--- /dev/null
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_cr_demo).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism cr-demo"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"RABBIT-CR-DEMO">>,
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok)
+%% START-OK: Username
+%% SECURE: "Please tell me your password"
+%% SECURE-OK: "My password is ~s", [Password]
+
+description() ->
+ [{name, <<"RABBIT-CR-DEMO">>},
+ {description, <<"RabbitMQ Demo challenge-response authentication "
+ "mechanism">>}].
+
+init(_Sock) ->
+ #state{}.
+
+handle_response(Response, State = #state{username = undefined}) ->
+ {challenge, <<"Please tell me your password">>,
+ State#state{username = Response}};
+
+handle_response(Response, #state{username = Username}) ->
+ case Response of
+ <<"My password is ", Password/binary>> ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+ _ ->
+ {protocol_error, "Invalid response '~s'", [Response]}
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
new file mode 100644
index 00000000..1c4e5c15
--- /dev/null
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_external).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism external"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
+%% established by means external to the mechanism". We define that to
+%% mean the peer certificate's subject's CN.
+
+description() ->
+ [{name, <<"EXTERNAL">>},
+ {description, <<"SASL EXTERNAL authentication mechanism">>}].
+
+init(Sock) ->
+ Username = case rabbit_net:peercert(Sock) of
+ {ok, C} ->
+ CN = case rabbit_ssl:peer_cert_subject_item(
+ C, ?'id-at-commonName') of
+ not_found -> {refused, "no CN found", []};
+ CN0 -> list_to_binary(CN0)
+ end,
+ case config_sane() of
+ true -> CN;
+ false -> {refused, "configuration unsafe", []}
+ end;
+ {error, no_peercert} ->
+ {refused, "no peer certificate", []};
+ nossl ->
+ {refused, "not SSL connection", []}
+ end,
+ #state{username = Username}.
+
+handle_response(_Response, #state{username = Username}) ->
+ case Username of
+ {refused, _, _} = E ->
+ E;
+ _ ->
+ case rabbit_access_control:check_user_login(Username, []) of
+ {ok, User} ->
+ {ok, User};
+ {error, not_found} ->
+ %% This is not an information leak as we have to
+ %% have validated a client cert to get this far.
+ {refused, "user '~s' not found", [Username]}
+ end
+ end.
+
+%%--------------------------------------------------------------------------
+
+config_sane() ->
+ {ok, Opts} = application:get_env(ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("EXTERNAL mechanism disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
new file mode 100644
index 00000000..e5f8f3e6
--- /dev/null
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_plain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism plain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"PLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
+%% apparently, by OpenAMQ.
+
+description() ->
+ [{name, <<"PLAIN">>},
+ {description, <<"SASL PLAIN authentication mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ %% The '%%"' at the end of the next line is for Emacs
+ case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
+ [{capture, all_but_first, binary}]) of
+ {match, [User, Pass]} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error, "response ~p invalid", [Response]}
+ end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 352e76fd..8603d8d7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about.
+ %% about. Must return 1 guid per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index a5297a70..e81066da 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -344,8 +344,7 @@ lookup_amqp_exception(#amqp_error{name = Name,
{ShouldClose, Code, ExplBin, Method};
lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- Protocol:lookup_amqp_exception(internal_error, Protocol),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb..ccadf5af 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a1db2ccf..7b5f096b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,10 +35,10 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
+-export([emit_stats/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,10 +47,9 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host, most_recently_declared_queue,
+ user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -83,19 +80,21 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ (channel_number(), pid(), pid(), rabbit_types:user(),
rabbit_types:vhost(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -103,16 +102,14 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
--spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
@@ -121,6 +118,9 @@ do(Pid, Method) ->
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -133,6 +133,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -156,18 +159,9 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
-flush(Pid) ->
- gen_server2:call(Pid, flush).
-
-flush_multiple_acks(Pid) ->
- gen_server2:cast(Pid, flush_multiple_acks).
-
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -183,7 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
next_tag = 1,
uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
- username = Username,
+ user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
@@ -191,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
- publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
+ publish_seqno = 1,
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -215,6 +207,9 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -224,9 +219,6 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
handle_call(_Request, _From, State) ->
noreply(State).
@@ -261,10 +253,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
- Msg = {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}},
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
@@ -291,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_multiple_acks, State) ->
- {noreply, flush_multiple(State)};
-
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -303,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -311,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = flush_multiple(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -370,7 +358,7 @@ return_queue_declare_ok(#resource{name = ActualName},
message_count = MessageCount,
consumer_count = ConsumerCount}).
-check_resource_access(Username, Resource, Perm) ->
+check_resource_access(User, Resource, Perm) ->
V = {Resource, Perm},
Cache = case get(permission_cache) of
undefined -> [];
@@ -380,7 +368,7 @@ check_resource_access(Username, Resource, Perm) ->
case lists:member(V, Cache) of
true -> lists:delete(V, Cache);
false -> ok = rabbit_access_control:check_resource_access(
- Username, Resource, Perm),
+ User, Resource, Perm),
lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
end,
put(permission_cache, [V | CacheTail]),
@@ -390,14 +378,32 @@ clear_permission_cache() ->
erase(permission_cache),
ok.
-check_configure_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, configure).
+check_configure_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, configure).
+
+check_write_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, write).
-check_write_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, write).
+check_read_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, read).
-check_read_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, read).
+check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Username},
+ #ch{user = #user{username = Username}}) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Claimed},
+ #ch{user = #user{username = Actual}}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "user_id property set to '~s' but "
+ "authenticated user was '~s'", [Claimed, Actual]).
+
+check_internal_exchange(#exchange{name = Name, internal = true}) ->
+ rabbit_misc:protocol_error(access_refused,
+ "cannot publish to internal ~s",
+ [rabbit_misc:rs(Name)]);
+check_internal_exchange(_) ->
+ ok.
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
@@ -465,51 +471,30 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
- State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm([], _QPid, State) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+confirm(MsgSeqNos, QPid, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ case gb_sets:is_element(MsgSeqNo, UC0) of
+ false -> {DMs, State0};
+ true -> Qs1 = sets:del_element(
+ QPid, dict:fetch(MsgSeqNo, QFM0)),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end
+ end, {[], State}, MsgSeqNos),
+ send_confirms(DoneMessages, State1).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -539,19 +524,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(DecodedContent#content.properties, State),
IsPersistent = is_message_persistent(DecodedContent),
- {MsgSeqNo, State1}
- = case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo,
- State#ch{publish_seqno = SeqNo + 1,
- unconfirmed =
- gb_sets:add(SeqNo, State#ch.unconfirmed)}}
- end,
+ {MsgSeqNo, State1} =
+ case ConfirmEnabled of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -771,7 +755,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = false,
durable = Durable,
auto_delete = AutoDelete,
- internal = false,
+ internal = Internal,
nowait = NoWait,
arguments = Args},
_, State = #ch{virtual_host = VHostPath}) ->
@@ -794,10 +778,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
CheckedType,
Durable,
AutoDelete,
+ Internal,
Args)
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Args),
+ AutoDelete, Internal, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
@@ -988,20 +973,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1231,27 +1206,53 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
- QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+ #ch{queues_for_msg = QFM, unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+ unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
lock_message(false, _MsgStruct, State) ->
State.
-terminate(State) ->
- stop_confirm_timer(State),
+send_confirms([], State) ->
+ State;
+send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
+ send_confirm(MsgSeqNo, WriterPid),
+ State;
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SCs = lists:usort(Cs),
+ CutOff = case gb_sets:is_empty(UC) of
+ true -> lists:last(SCs) + 1;
+ false -> gb_sets:smallest(UC)
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
+ [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ State.
+
+send_confirm(undefined, _WriterPid) ->
+ ok;
+send_confirm(SeqNo, WriterPid) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = SeqNo}).
+
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
@@ -1260,7 +1261,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{username = Username}) -> Username;
+i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
@@ -1337,42 +1338,3 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
- ?MODULE, flush_multiple_acks, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-flush_multiple(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 02199a65..9f50176d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -48,15 +48,15 @@
-type(start_link_args() ::
{rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -69,16 +69,11 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, Username, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, AState} = rabbit_command_assembler:init(Protocol),
+ {ok, SupPid, {ChannelPid, AState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780..fd99af56 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
new file mode 100644
index 00000000..f8d3260e
--- /dev/null
+++ b/src/rabbit_command_assembler.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_command_assembler).
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([analyze_frame/3, init/1, process/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+ ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
+ ?FRAME_TRACE | ?FRAME_HEARTBEAT).
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+
+init(Protocol) -> {ok, {method, Protocol}}.
+
+process({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ try
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end
+ catch exit:#amqp_error{} = Reason -> {error, Reason}
+ end;
+process(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+process({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+process({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId], Method);
+process(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+process({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
+
+%%--------------------------------------------------------------------
+
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
+
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index ff3995b5..a6b1f7fa 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -78,4 +78,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 360217a2..8a3275bc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -201,44 +201,48 @@ action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
- call(Node, {rabbit_access_control, add_user, Args});
+ call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
- call(Node, {rabbit_access_control, delete_user, Args});
+ call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
- call(Node, {rabbit_access_control, change_password, Args});
+ call(Node, {rabbit_auth_backend_internal, change_password, Args});
+
+action(clear_password, Node, Args = [Username], _Opts, Inform) ->
+ Inform("Clearing password for user ~p", [Username]),
+ call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_admin, Node, [Username], _Opts, Inform) ->
Inform("Setting administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, set_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, set_admin, [Username]});
action(clear_admin, Node, [Username], _Opts, Inform) ->
Inform("Clearing administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, clear_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]});
action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
- display_list(call(Node, {rabbit_access_control, list_users, []}));
+ display_list(call(Node, {rabbit_auth_backend_internal, list_users, []}));
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
- call(Node, {rabbit_access_control, add_vhost, Args});
+ call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
- call(Node, {rabbit_access_control, delete_vhost, Args});
+ call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
- display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
+ display_list(call(Node, {rabbit_vhost, list, []}));
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_permissions,
- Args}));
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_user_permissions, Args}));
action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
@@ -292,19 +296,20 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, set_permissions,
+ call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
+ call(Node, {rabbit_auth_backend_internal, clear_permissions,
+ [Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
- display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
- [VHost]})).
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_vhost_permissions, [VHost]})).
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 42861f86..dd009c83 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -49,7 +49,7 @@ boot() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, false, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 00e479a2..a95cf0b1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,11 +33,11 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
+-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
--export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
+-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -49,13 +49,14 @@
-type(type() :: atom()).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 ::
- (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
+-spec(declare/6 ::
+ (name(), type(), boolean(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
-> rabbit_types:exchange()).
-spec(check_type/1 ::
(binary()) -> atom() | rabbit_types:connection_exit()).
--spec(assert_equivalence/5 ::
- (rabbit_types:exchange(), atom(), boolean(), boolean(),
+-spec(assert_equivalence/6 ::
+ (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
-> 'ok' | rabbit_types:connection_exit()).
-spec(assert_args_equivalence/2 ::
@@ -90,7 +91,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
Xs = rabbit_misc:table_fold(
@@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(XName, Type, Durable, AutoDelete, Args) ->
+declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
@@ -150,17 +152,17 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
- case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- case rabbit_exchange_type_registry:lookup_module(T) of
+ case rabbit_registry:lookup_module(exchange, T) of
{error, not_found} -> rabbit_misc:protocol_error(
command_invalid,
"invalid exchange type '~s'", [T]);
@@ -170,14 +172,16 @@ check_type(TypeBin) ->
assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
type = Type},
- Type, Durable, AutoDelete, RequiredArgs) ->
+ Type, Durable, AutoDelete, Internal, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
-assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
- _Args) ->
+assert_equivalence(#exchange{ name = Name },
+ _Type, _Durable, _Internal, _AutoDelete, _Args) ->
rabbit_misc:protocol_error(
precondition_failed,
- "cannot redeclare ~s with different type, durable or autodelete value",
+ "cannot redeclare ~s with different type, durable, "
+ "internal or autodelete value",
[rabbit_misc:rs(Name)]).
assert_args_equivalence(#exchange{ name = Name, arguments = Args },
@@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
+i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a497..d49d0199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"direct">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686..e7f75464 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"fanout">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba4..caf141fe 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -42,9 +42,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"headers">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-ifdef(use_specs).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4d..44851858 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"topic">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-export([topic_matches/2]).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
deleted file mode 100644
index cb53185f..00000000
--- a/src/rabbit_framing_channel.erl
+++ /dev/null
@@ -1,129 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_framing_channel).
--include("rabbit.hrl").
-
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
-
-%%--------------------------------------------------------------------
-
-read_frame(ChannelPid) ->
- receive
- {frame, Frame} -> Frame;
- terminate -> rabbit_channel:shutdown(ChannelPid),
- read_frame(ChannelPid);
- Msg -> exit({unexpected_message, Msg})
- end.
-
-mainloop(Parent, ChannelPid, Protocol) ->
- case read_frame(ChannelPid) of
- {method, MethodName, FieldsBin} ->
- Method = Protocol:decode_method_fields(MethodName, FieldsBin),
- case Protocol:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- case collect_content(ChannelPid, ClassId, Protocol) of
- {ok, Content} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol);
- {error, Reason} ->
- channel_exit(Parent, Reason, MethodName)
- end;
- false -> rabbit_channel:do(ChannelPid, Method),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol)
- end;
- _ ->
- channel_exit(Parent, {unexpected_frame,
- "expected method frame, "
- "got non method frame instead",
- []}, none)
- end.
-
-collect_content(ChannelPid, ClassId, Protocol) ->
- case read_frame(ChannelPid) of
- {content_header, ClassId, 0, BodySize, PropertiesBin} ->
- case collect_content_payload(ChannelPid, BodySize, []) of
- {ok, Payload} -> {ok, #content{
- class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload}};
- Error -> Error
- end;
- {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]}};
- _ ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId]}}
- end.
-
-collect_content_payload(_ChannelPid, 0, Acc) ->
- {ok, Acc};
-collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
- case read_frame(ChannelPid) of
- {content_body, FragmentBin} ->
- collect_content_payload(ChannelPid,
- RemainingByteCount - size(FragmentBin),
- [FragmentBin | Acc]);
- _ ->
- {error, {unexpected_frame,
- "expected content body, "
- "got non content body frame instead",
- []}}
- end.
-
-channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
- Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
- MethodName),
- Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 52d76ac4..15ba787a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,7 +46,7 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
+-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
@@ -72,7 +72,7 @@
-ifdef(use_specs).
--export_type([resource_name/0]).
+-export_type([resource_name/0, thunk/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -137,10 +137,9 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A).
--spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A).
+-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
- (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
+ (rabbit_types:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
@@ -344,8 +343,8 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ catch exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
Handler()
end.
@@ -366,19 +365,8 @@ with_user(Username, Thunk) ->
end
end.
-with_vhost(VHostPath, Thunk) ->
- fun () ->
- case mnesia:read({rabbit_vhost, VHostPath}) of
- [] ->
- mnesia:abort({no_such_vhost, VHostPath});
- [_V] ->
- Thunk()
- end
- end.
-
with_user_and_vhost(Username, VHostPath, Thunk) ->
- with_user(Username, with_vhost(VHostPath, Thunk)).
-
+ with_user(Username, rabbit_vhost:with(VHostPath, Thunk)).
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
@@ -589,19 +577,19 @@ sort_field_table(Arguments) ->
pid_to_string(Pid) when is_pid(Pid) ->
%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
%% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+ lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
%% inverse of above
string_to_pid(Str) ->
Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
- {match, [NodeStr, IdStr, SerStr]} ->
+ {match, [NodeStr, CreStr, IdStr, SerStr]} ->
%% the NodeStr atom might be quoted, so we have to parse
%% it rather than doing a simple list_to_atom
NodeAtom = case erl_scan:string(NodeStr) of
@@ -609,9 +597,9 @@ string_to_pid(Str) ->
{error, _, _} -> throw(Err)
end,
<<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
- Id = list_to_integer(IdStr),
- Ser = list_to_integer(SerStr),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ [Cre, Id, Ser] = lists:map(fun list_to_integer/1,
+ [CreStr, IdStr, SerStr]),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
nomatch ->
throw(Err)
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a62e7a6f..38cc82a6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
+ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
+ empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -63,6 +64,8 @@
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(all_clustered_nodes/0 :: () -> [node()]).
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -81,12 +84,12 @@ status() ->
Nodes = nodes_of_type(CopyType),
Nodes =/= []
end];
- no -> case mnesia:system_info(db_nodes) of
+ no -> case all_clustered_nodes() of
[] -> [];
Nodes -> [{unknown, Nodes}]
end
end},
- {running_nodes, mnesia:system_info(running_db_nodes)}].
+ {running_nodes, running_clustered_nodes()}].
init() ->
ok = ensure_mnesia_running(),
@@ -127,9 +130,15 @@ reset() -> reset(false).
force_reset() -> reset(true).
is_clustered() ->
- RunningNodes = mnesia:system_info(running_db_nodes),
+ RunningNodes = running_clustered_nodes(),
[node()] /= RunningNodes andalso [] /= RunningNodes.
+all_clustered_nodes() ->
+ mnesia:system_info(db_nodes).
+
+running_clustered_nodes() ->
+ mnesia:system_info(running_db_nodes).
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -154,10 +163,10 @@ nodes_of_type(Type) ->
table_definitions() ->
[{rabbit_user,
- [{record_name, user},
- {attributes, record_info(fields, user)},
+ [{record_name, internal_user},
+ {attributes, record_info(fields, internal_user)},
{disc_copies, [node()]},
- {match, #user{_='_'}}]},
+ {match, #internal_user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
@@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir),
- mnesia:system_info(db_nodes)} of
+ case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
@@ -388,7 +396,7 @@ init_db(ClusterNodes, Force) ->
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
@@ -566,8 +574,8 @@ reset(Force) ->
{Nodes, RunningNodes} =
try
ok = init(),
- {mnesia:system_info(db_nodes) -- [Node],
- mnesia:system_info(running_db_nodes) -- [Node]}
+ {all_clustered_nodes() -- [Node],
+ running_clustered_nodes() -- [Node]}
after
mnesia:stop()
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e8b4e8e2..f8b41ed3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,6 +81,7 @@
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
+ dying_clients, %% set of dying clients
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
@@ -306,6 +307,17 @@
%% sure that reads are not attempted from files which are in the
%% process of being garbage collected.
%%
+%% When a message is removed, its reference count is decremented. Even
+%% if the reference count becomes 0, its entry is not removed. This is
+%% because in the event of the same message being sent to several
+%% different queues, there is the possibility of one queue writing and
+%% removing the message before other queues write it at all. Thus
+%% accomodating 0-reference counts allows us to avoid unnecessary
+%% writes here. Of course, there are complications: the file to which
+%% the message has already been written could be locked pending
+%% deletion or GC, which means we have to rewrite the message as the
+%% original copy will now be lost.
+%%
%% The server automatically defers reads, removes and contains calls
%% that occur which refer to files which are currently being
%% GC'd. Contains calls are only deferred in order to ensure they do
@@ -323,6 +335,55 @@
%% heavily overloaded, clients can still write and read messages with
%% very low latency and not block at all.
%%
+%% Clients of the msg_store are required to register before using the
+%% msg_store. This provides them with the necessary client-side state
+%% to allow them to directly access the various caches and files. When
+%% they terminate, they should deregister. They can do this by calling
+%% either client_terminate/1 or client_delete_and_terminate/1. The
+%% differences are: (a) client_terminate is synchronous. As a result,
+%% if the msg_store is badly overloaded and has lots of in-flight
+%% writes and removes to process, this will take some time to
+%% return. However, once it does return, you can be sure that all the
+%% actions you've issued to the msg_store have been processed. (b) Not
+%% only is client_delete_and_terminate/1 asynchronous, but it also
+%% permits writes and subsequent removes from the current
+%% (terminating) client which are still in flight to be safely
+%% ignored. Thus from the point of view of the msg_store itself, and
+%% all from the same client:
+%%
+%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
+%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
+%%
+%% The client obviously sent T after all the other messages (up to
+%% W4), but because the msg_store prioritises messages, the T can be
+%% promoted and thus received early.
+%%
+%% Thus at the point of the msg_store receiving T, we have messages 1
+%% and 2 with a refcount of 1. After T, W3 will be ignored because
+%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
+%% ignored because the messages that they refer to were already known
+%% to the msg_store prior to T. However, it can be a little more
+%% complex: after the first R2, the refcount of msg 2 is 0. At that
+%% point, if a GC occurs or file deletion, msg 2 could vanish, which
+%% would then mean that the subsequent W2 and R2 are then ignored.
+%%
+%% The use case then for client_delete_and_terminate/1 is if the
+%% client wishes to remove everything it's written to the msg_store:
+%% it issues removes for all messages it's written and not removed,
+%% and then calls client_delete_and_terminate/1. At that point, any
+%% in-flight writes (and subsequent removes) can be ignored, but
+%% removes and writes for messages the msg_store already knows about
+%% will continue to be processed normally (which will normally just
+%% involve modifying the reference count, which is fast). Thus we save
+%% disk bandwidth for writes which are going to be immediately removed
+%% again by the the terminating client.
+%%
+%% We use a separate set to keep track of the dying clients in order
+%% to keep that set, which is inspected on every write and remove, as
+%% small as possible. Inspecting client_refs - the set of all clients
+%% - would degrade performance with many healthy clients and few, if
+%% any, dying clients, which is the typical case.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
+ ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
@@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients = sets:new(),
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) ->
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
+ {client_dying, _Pid} -> 7;
_ -> 0
end.
@@ -681,65 +745,65 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1).
+handle_cast({client_dying, CRef},
+ State = #msstate { dying_clients = DyingClients }) ->
+ DyingClients1 = sets:add_element(CRef, DyingClients),
+ write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+
handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate {
- client_refs = sets:del_element(CRef, ClientRefs) });
+ State = #msstate { client_refs = ClientRefs,
+ dying_clients = DyingClients }) ->
+ State1 = clear_client_callback(
+ CRef, State #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs),
+ dying_clients = sets:del_element(CRef, DyingClients) }),
+ noreply(remove_message(CRef, CRef, State1));
handle_cast({write, CRef, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- current_file = CurFile,
+ State = #msstate { file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:empty(), CTG);
- error -> CTG
- end,
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
State1 = State #msstate { cref_to_guids = CTG1 },
- case index_lookup(Guid, State1) of
- not_found ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ noreply(State);
+ {false, not_found} ->
write_message(Guid, Msg, State1);
- #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State1),
write_message(Guid, Msg, State1);
- [#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State1),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, TotalSize}]),
- noreply(State1 #msstate {
- sum_valid_data = SumValid + TotalSize })
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently,
+ %% we'll have to write a new copy, which will then
+ %% be younger, so ignore this write.
+ noreply(State);
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
- #msg_location { ref_count = RefCount, file = File } ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State1),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
- CTG;
- _ -> CTG1
- end,
- noreply(State #msstate { cref_to_guids = CTG2 })
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, State2) end,
+ fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
- noreply(maybe_compact(State2));
+ noreply(maybe_compact(
+ client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -861,9 +925,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
CGs = dict:fold(fun (CRef, Guids, NS) ->
case gb_sets:is_empty(Guids) of
@@ -871,14 +935,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, Guids} | NS]
end
end, [], CTG),
- if Syncs =:= [] andalso CGs =:= [] -> ok;
- true -> file_handle_cache:sync(CurHdl)
+ case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
end,
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [K() || K <- lists:reverse(Syncs)],
+ [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
-
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
@@ -990,34 +1054,43 @@ contains_message(Guid, From,
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } =
- index_lookup_positive_ref_count(Guid, State),
- %% only update field, otherwise bad interaction with concurrent GC
- Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, File, State);
- [#file_summary {}] ->
+remove_message(Guid, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg
+ %% whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }} when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec =
+ fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here
+ %% because there may be further writes in the mailbox
+ %% for the same msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, Guid, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(File, -TotalSize,
+ State))
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
ok = Dec(),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, -TotalSize}]),
- delete_file_if_empty(
- File, State #msstate {
- sum_valid_data = SumValid - TotalSize })
- end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
- State
+ State
+ end
end.
add_to_pending_gc_completion(
@@ -1039,8 +1112,8 @@ run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending_action({remove, Guid}, State) ->
- remove_message(Guid, State).
+run_pending_action({remove, Guid, CRef}, State) ->
+ remove_message(Guid, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1051,15 +1124,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+adjust_valid_total_size(File, Delta, State = #msstate {
+ sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts }) ->
+ [_] = ets:update_counter(FileSummaryEts, File,
+ [{#file_summary.valid_total_size, Delta}]),
+ State #msstate { sum_valid_data = SumValid + Delta }.
+
orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, ActionTaken,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, ActionTaken),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
@@ -1073,6 +1153,52 @@ client_confirm(CRef, Guids,
error -> State
end.
+add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
+ case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef,
+ fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end.
+
+client_confirm_if_on_disk(CRef, Guid, File,
+ State = #msstate { client_ondisk_callback = CODC,
+ current_file = CurFile,
+ cref_to_guids = CTG }) ->
+ CTG1 =
+ case File of
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
+ _ -> case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
+ _ -> ok
+ end,
+ CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }.
+
+%% Detect whether the Guid is older or younger than the client's death
+%% msg (if there is one). If the msg is older than the client death
+%% msg, and it has a 0 ref_count we must only alter the ref_count, not
+%% rewrite the msg - rewriting it would make it younger than the death
+%% msg and thus should be ignored. Note that this (correctly) returns
+%% false when testing to remove the death msg itself.
+should_mask_action(CRef, Guid,
+ State = #msstate { dying_clients = DyingClients }) ->
+ case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ {false, Location} ->
+ {false, Location};
+ {true, not_found} ->
+ {true, not_found};
+ {true, #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } = Location} ->
+ #msg_location { file = DeathFile, offset = DeathOffset } =
+ index_lookup(CRef, State),
+ {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
+ {true, _} -> true;
+ {false, 0} -> false_if_increment;
+ {false, _} -> false
+ end, Location}
+ end.
%%----------------------------------------------------------------------------
%% file helper functions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 89954b06..c6a083bb 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,7 +32,7 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([is_ssl/1, controlling_process/2, getstat/2,
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
async_recv/3, port_command/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -50,6 +50,9 @@
-type(socket() :: port() | #ssl_socket{}).
-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(ssl_info/1 :: (socket())
+ -> 'nossl' | ok_val_or_error(
+ {atom(), {atom(), atom(), atom()}})).
-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(getstat/2 ::
(socket(), [stat_option()])
@@ -77,6 +80,11 @@
is_ssl(Sock) -> ?IS_SSL(Sock).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_info(Sock#ssl_socket.ssl);
+ssl_info(_Sock) ->
+ nossl.
+
controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1c542ffe..d5a9d73c 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) ->
connections() ->
[rabbit_connection_sup:reader(ConnSup) ||
+ Node <- rabbit_mnesia:running_clustered_nodes(),
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children({rabbit_tcp_client_sup, Node})].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 867ecb12..8ae45abd 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -53,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -132,7 +132,7 @@ start() ->
|| App <- PluginApps],
io:nl(),
- ok = duplicate_node_check(Node),
+ ok = duplicate_node_check(NodeStr),
terminate(0),
ok.
@@ -259,7 +259,8 @@ process_entry(Entry) ->
duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
-duplicate_node_check(Node) ->
+duplicate_node_check(NodeStr) ->
+ Node = rabbit_misc:makenode(NodeStr),
{NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
@@ -272,7 +273,6 @@ duplicate_node_check(Node) ->
terminate(?ERROR_CODE);
false -> ok
end;
- {error, address} -> ok;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
end.
@@ -283,12 +283,9 @@ terminate(Fmt, Args) ->
terminate(Status) ->
case os:type() of
- {unix, _} ->
- halt(Status);
- {win32, _} ->
- init:stop(Status),
- receive
- after infinity -> ok
- end
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
end.
-
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef..2162104f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index cdb3586a..05e383d8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/3]).
+-export([process_channel_frame/5]). %% used by erlang-client
-export([emit_stats/1]).
@@ -55,14 +55,17 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
+ auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
- peer_cert_validity,
+ peer_cert_validity, auth_mechanism,
+ ssl_protocol, ssl_key_exchange,
+ ssl_cipher, ssl_hash,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -293,7 +296,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ auth_mechanism = none,
+ auth_state = none
},
handshake, 8))
catch
@@ -343,12 +348,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -439,45 +444,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -495,10 +487,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -529,15 +521,13 @@ maybe_close(State) ->
State.
termination_kind(normal) -> controlled;
-termination_kind(shutdown) -> controlled;
-termination_kind({shutdown, _Term}) -> controlled;
termination_kind(_) -> uncontrolled.
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -547,7 +537,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
@@ -556,19 +546,23 @@ handle_frame(Type, 0, Payload,
end;
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ NewAState = process_channel_frame(
+ AnalyzedFrame, self(),
+ Channel, ChPid, FramingState),
+ put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
State;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
true -> State#v1{connection_state = blocked};
false -> State
@@ -597,9 +591,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -607,22 +600,6 @@ handle_frame(Type, Channel, Payload,
end
end.
-analyze_frame(?FRAME_METHOD,
- <<ClassId:16, MethodId:16, MethodFields/binary>>,
- Protocol) ->
- MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
- {method, MethodName, MethodFields};
-analyze_frame(?FRAME_HEADER,
- <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
- _Protocol) ->
- {content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body, _Protocol) ->
- {content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
- heartbeat;
-analyze_frame(_Type, _Body, _Protocol) ->
- error.
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -680,11 +657,12 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Start = #'connection.start'{ version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> },
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = auth_mechanisms_binary(),
+ locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
@@ -709,42 +687,45 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- try
- handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
- State)
- catch exit:Reason ->
- CompleteReason = case Reason of
- #amqp_error{method = none} ->
- Reason#amqp_error{method = MethodName};
- OtherReason -> OtherReason
- end,
+ HandleException =
+ fun(R) ->
case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, CompleteReason);
+ true -> send_exception(State, 0, R);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state,
- CompleteReason})
+ throw({channel0_error, State#v1.connection_state, R})
end
+ end,
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch exit:#amqp_error{method = none} = Reason ->
+ HandleException(Reason#amqp_error{method = MethodName});
+ Type:Reason ->
+ HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
- State = #v1{connection_state = starting,
- connection = Connection =
- #connection{protocol = Protocol},
- sock = Sock}) ->
- User = rabbit_access_control:check_login(Mechanism, Response),
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = 0},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{
- user = User,
- client_properties = ClientProperties}};
+ State0 = #v1{connection_state = starting,
+ connection = Connection,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism),
+ State = State0#v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock),
+ connection_state = securing,
+ connection =
+ Connection#connection{
+ client_properties = ClientProperties}},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
@@ -761,17 +742,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ServerFrameMax]);
true ->
- SendFun =
- fun() ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- catch rabbit_net:send(Sock, Frame)
- end,
-
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun =
- fun() ->
- Parent ! timeout
- end,
+ ReceiveFun = fun() -> Parent ! timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -802,7 +776,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -833,6 +807,61 @@ server_frame_max() ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+auth_mechanism_to_module(TypeBin) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms()),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms() ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ lists:member(Name, Configured)].
+
+auth_mechanisms_binary() ->
+ list_to_binary(
+ string:join(
+ [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+
+auth_phase(Response,
+ State = #v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthState,
+ connection = Connection =
+ #connection{protocol = Protocol},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Msg, Args} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s login refused: ~s",
+ [proplists:get_value(name, AuthMechanism:description()),
+ io_lib:format(Msg, Args)]);
+ {protocol_error, Msg, Args} ->
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{auth_state = AuthState1};
+ {ok, User} ->
+ Tune = #'connection.tune'{channel_max = 0,
+ frame_max = server_frame_max(),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User}}
+ end.
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -849,6 +878,14 @@ i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(ssl, #v1{sock = Sock}) ->
rabbit_net:is_ssl(Sock);
+i(ssl_protocol, #v1{sock = Sock}) ->
+ ssl_info(fun ({P, _}) -> P end, Sock);
+i(ssl_key_exchange, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
+i(ssl_cipher, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
+i(ssl_hash, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -870,6 +907,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) ->
none;
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
Protocol:version();
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
+ none;
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ proplists:get_value(name, Mechanism:description());
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -895,6 +936,13 @@ socket_info(Get, Select) ->
{error, _} -> ''
end.
+ssl_info(F, Sock) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Info} -> F(Info)
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
@@ -909,17 +957,31 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
frame_max = FrameMax,
- user = #user{username = Username},
+ user = User,
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), Username, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ self(), User, VHost, Collector}),
+ erlang:monitor(process, ChPid),
+ NewAState = process_channel_frame(AnalyzedFrame, self(),
+ Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ put({ch_pid, ChPid}, Channel),
+ State.
+
+process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ Method, Content),
+ NewAState;
+ {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl
index f15275b5..7a3fcb51 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_registry.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_exchange_type_registry).
+-module(rabbit_registry).
-behaviour(gen_server).
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -46,11 +46,12 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
--spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_module/2 ::
+ (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]).
-endif.
@@ -61,8 +62,8 @@ start_link() ->
%%---------------------------------------------------------------------------
-register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+register(Class, TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
TypeAtom -> TypeAtom
end.
-lookup_module(T) when is_atom(T) ->
- case ets:lookup(?ETS_NAME, T) of
+lookup_module(Class, T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, {Class, T}) of
[{_, Module}] ->
{ok, Module};
[] ->
{error, not_found}
end.
+lookup_all(Class) ->
+ [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
+
%%---------------------------------------------------------------------------
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
- when is_binary(TypeName), is_atom(ModuleName) ->
- ok = sanity_check_module(ModuleName),
+internal_register(Class, TypeName, ModuleName)
+ when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(class_module(Class), ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
-sanity_check_module(Module) ->
- case catch lists:member(rabbit_exchange_type,
+sanity_check_module(ClassModule, Module) ->
+ case catch lists:member(ClassModule,
lists:flatten(
[Bs || {Attr, Bs} <-
Module:module_info(attributes),
Attr =:= behavior orelse
Attr =:= behaviour])) of
{'EXIT', {undef, _}} -> {error, not_module};
- false -> {error, not_exchange_type};
+ false -> {error, {not_type, ClassModule}};
true -> ok
end.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism.
+
%%---------------------------------------------------------------------------
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1d8ce23b..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -36,6 +36,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+-export([peer_cert_subject_item/2]).
%%--------------------------------------------------------------------------
@@ -45,9 +46,11 @@
-type(certificate() :: binary()).
--spec(peer_cert_issuer/1 :: (certificate()) -> string()).
--spec(peer_cert_subject/1 :: (certificate()) -> string()).
--spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject_item/2 ::
+ (certificate(), tuple()) -> string() | 'not_found').
-endif.
@@ -71,6 +74,14 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
+%% Return a part of the certificate's subject.
+peer_cert_subject_item(Cert, Type) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ find_by_type(Type, Subject)
+ end, Cert).
+
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
@@ -89,6 +100,14 @@ cert_info(F, Cert) ->
DecCert -> DecCert %%R14B onwards
end).
+find_by_type(Type, {rdnSequence, RDNs}) ->
+ case [V || #'AttributeTypeAndValue'{type = T, value = V}
+ <- lists:flatten(RDNs),
+ T == Type] of
+ [{printableString, S}] -> S;
+ [] -> not_found
+ end.
+
%%--------------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index adf968cb..d913092c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -96,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ %% we now run the tests remotely, so that code coverage on the
+ %% local node picks up more of the delegate
+ Node = node(),
+ Self = self(),
+ Remote = spawn(SecondaryNode,
+ fun () -> A = test_delegates_async(Node),
+ B = test_delegates_sync(Node),
+ Self ! {self(), {A, B}}
+ end),
+ receive
+ {Remote, Result} ->
+ Result = {passed, passed}
+ after 2000 ->
+ throw(timeout)
+ end,
+
passed.
test_priority_queue() ->
@@ -1014,7 +1030,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self(),
+ user(<<"user">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1074,7 +1090,7 @@ test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
{ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- <<"guest">>, <<"/">>, self(),
+ user(<<"guest">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -1082,6 +1098,13 @@ test_spawn(Receiver) ->
end,
{Writer, Ch}.
+user(Username) ->
+ #user{username = Username,
+ is_admin = true,
+ auth_backend = rabbit_auth_backend_internal,
+ impl = #internal_user{username = Username,
+ is_admin = true}}.
+
test_statistics_receiver(Pid) ->
receive
shutdown ->
@@ -1247,15 +1270,26 @@ test_delegates_sync(SecondaryNode) ->
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
- Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
- Good = ordsets:from_list(GoodResPids),
+ Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
+ Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
- Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
- Bad = ordsets:from_list(BadResPids),
+ Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
+ Bad = lists:usort(BadResPids),
+
+ MagicalPids = [rabbit_misc:string_to_pid(Str) ||
+ Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
+ {[], BadNodes} = delegate:invoke(MagicalPids, Sender),
+ true = lists:all(
+ fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
+ BadNodes),
+ BadNodesPids = [Pid || {Pid, _} <- BadNodes],
+
+ Magical = lists:usort(MagicalPids),
+ Magical = lists:usort(BadNodesPids),
passed.
@@ -1662,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
- {A, B} =
+ {A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
@@ -1671,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:write(Guid, Guid, MSCState),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
end, {Qi, []}, SeqIds),
+ %% do this just to force all of the publishes through to the msg_store:
+ true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
@@ -1854,7 +1890,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1956,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1966,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2000,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2030,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2047,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2078,7 +2114,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2140,3 +2176,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 548014be..70d18d7a 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,8 +42,9 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
- connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
- ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection/0, protocol/0, user/0, internal_user/0,
+ username/0, password/0, password_hash/0, ok/1, error/1,
+ ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
connection_exit/0]).
-type(channel_exit() :: no_return()).
@@ -151,9 +152,19 @@
-type(protocol() :: rabbit_framing:protocol()).
-type(user() ::
- #user{username :: rabbit_access_control:username(),
- password_hash :: rabbit_access_control:password_hash(),
- is_admin :: boolean()}).
+ #user{username :: username(),
+ is_admin :: boolean(),
+ auth_backend :: atom(),
+ impl :: any()}).
+
+-type(internal_user() ::
+ #internal_user{username :: username(),
+ password_hash :: password_hash(),
+ is_admin :: boolean()}).
+
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(password_hash() :: binary()).
-type(ok(A) :: {'ok', A}).
-type(error(A) :: {'error', A}).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1c56d51d..b5ff2b12 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -27,6 +27,8 @@
-rabbit_upgrade({remove_user_scope, []}).
-rabbit_upgrade({hash_passwords, []}).
-rabbit_upgrade({add_ip_to_listener, []}).
+-rabbit_upgrade({internal_exchanges, []}).
+-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -35,6 +37,8 @@
-spec(remove_user_scope/0 :: () -> 'ok').
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
+-spec(internal_exchanges/0 :: () -> 'ok').
+-spec(user_to_internal_user/0 :: () -> 'ok').
-endif.
@@ -58,7 +62,7 @@ hash_passwords() ->
mnesia(
rabbit_user,
fun ({user, Username, Password, IsAdmin}) ->
- Hash = rabbit_access_control:hash_password(Password),
+ Hash = rabbit_auth_backend_internal:hash_password(Password),
{user, Username, Hash, IsAdmin}
end,
[username, password_hash, is_admin]).
@@ -71,8 +75,33 @@ add_ip_to_listener() ->
end,
[node, protocol, host, ip_address, port]).
+internal_exchanges() ->
+ Tables = [rabbit_exchange, rabbit_durable_exchange],
+ AddInternalFun =
+ fun ({exchange, Name, Type, Durable, AutoDelete, Args}) ->
+ {exchange, Name, Type, Durable, AutoDelete, false, Args}
+ end,
+ [ ok = mnesia(T,
+ AddInternalFun,
+ [name, type, durable, auto_delete, internal, arguments])
+ || T <- Tables ],
+ ok.
+
+user_to_internal_user() ->
+ mnesia(
+ rabbit_user,
+ fun({user, Username, PasswordHash, IsAdmin}) ->
+ {internal_user, Username, PasswordHash, IsAdmin}
+ end,
+ [username, password_hash, is_admin], internal_user).
+
%%--------------------------------------------------------------------
mnesia(TableName, Fun, FieldList) ->
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
+
+mnesia(TableName, Fun, FieldList, NewRecordName) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
+ NewRecordName),
+ ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0db51165..665cac96 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,7 @@
timestamp :: timestamp() }).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
+ count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, ActionTaken) ->
+ msgs_written_to_disk(Self, Guids, ActionTaken)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message{guid = Guid}}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
+ a(reduce_memory_use(
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
@@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -812,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync }) ->
+ case {OnSync, needs_index_sync(State)} of
+ {?BLANK_SYNC, false} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1096,9 +1098,9 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
+ Self, fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
@@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1311,10 +1312,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true ->
- {{IsPersistent, Guid, MsgProps}, RAI};
- false ->
- {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, Guid, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1325,8 +1324,8 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], orddict:new()}, PA),
+ {PersistentSeqIds, GuidsByStore} =
+ dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1336,18 +1335,17 @@ remove_pending_ack(KeepPersistent,
Guids),
State1
end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(
- fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ false -> IndexState1 =
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1361,27 +1359,27 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA),
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
- end, {{[], orddict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- AckdGuids = lists:concat(
- orddict:fold(
- fun (IsPersistent, Guids, Gs) ->
- MsgStoreFun(MSCState, IsPersistent, Guids),
- [Guids | Gs]
- end, [], GuidsByStore)),
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State1 #vqstate { index_state = IndexState1,
+ State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ ack_out_counter = AckOutCount + length(AckTags) }.
+
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false }, Acc) ->
- Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
- {cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1393,6 +1391,13 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
+ case needs_index_sync(State) of
+ true -> State #vqstate {
+ index_state = rabbit_queue_index:sync(IndexState) };
+ false -> State
+ end.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1400,14 +1405,35 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
unconfirmed = gb_sets:difference(UC, GuidSet) }.
+needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ %% If UC is empty then by definition, MIOD and MOD are also empty
+ %% and there's nothing that can be pending a sync.
+
+ %% If UC is not empty, then we want to find is_empty(UC - MIOD),
+ %% but the subtraction can be expensive. Thus instead, we test to
+ %% see if UC is a subset of MIOD. This can only be the case if
+ %% MIOD == UC, which would indicate that every message in UC is
+ %% also in MIOD and is thus _all_ pending on a msg_store sync, not
+ %% on a qi sync. Thus the negation of this is sufficient. Because
+ %% is_subset is short circuiting, this is more efficient than the
+ %% subtraction.
+ not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
+
msgs_confirmed(GuidSet, State) ->
- {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}.
+ {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
+
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
-msgs_written_to_disk(QPid, GuidSet) ->
+msgs_written_to_disk(QPid, GuidSet, removed) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
@@ -1417,9 +1443,9 @@ msgs_written_to_disk(QPid, GuidSet) ->
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
new file mode 100644
index 00000000..f939a3fe
--- /dev/null
+++ b/src/rabbit_vhost.erl
@@ -0,0 +1,122 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_vhost).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-export([add/1, delete/1, exists/1, list/0, with/2]).
+
+-ifdef(use_specs).
+
+-spec(add/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(exists/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(list/0 :: () -> [rabbit_types:vhost()]).
+-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+add(VHostPath) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
+ [] ->
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok;
+ [_] ->
+ mnesia:abort({vhost_already_exists, VHostPath})
+ end
+ end),
+ rabbit_log:info("Added vhost ~p~n", [VHostPath]),
+ R.
+
+delete(VHostPath) ->
+ %%FIXME: We are forced to delete the queues outside the TX below
+ %%because queue deletion involves sending messages to the queue
+ %%process, which in turn results in further mnesia actions and
+ %%eventually the termination of that process.
+ lists:foreach(fun (Q) ->
+ {ok,_} = rabbit_amqqueue:delete(Q, false, false)
+ end,
+ rabbit_amqqueue:list(VHostPath)),
+ R = rabbit_misc:execute_mnesia_transaction(
+ with(VHostPath, fun () ->
+ ok = internal_delete(VHostPath)
+ end)),
+ rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
+ R.
+
+internal_delete(VHostPath) ->
+ lists:foreach(fun (#exchange{name = Name}) ->
+ ok = rabbit_exchange:delete(Name, false)
+ end,
+ rabbit_exchange:list(VHostPath)),
+ lists:foreach(
+ fun ({Username, _, _, _}) ->
+ ok = rabbit_auth_backend_internal:clear_permissions(Username,
+ VHostPath)
+ end,
+ rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
+ ok.
+
+exists(VHostPath) ->
+ mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+
+list() ->
+ mnesia:dirty_all_keys(rabbit_vhost).
+
+with(VHostPath, Thunk) ->
+ fun () ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [_V] ->
+ Thunk()
+ end
+ end.