summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-12 14:56:54 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-12 14:56:54 +0000
commit09c5c41ef07eb52c08d493e646f141d9181a2e6a (patch)
treee18b564b983c34c9326a6d03d4ecda12ef4511f2
parentb7e4bb978ddac088e0151037db35c8cf2fd4667e (diff)
parent5990a0a00e7ec2c79e75c0f62bf89311bee3036c (diff)
downloadrabbitmq-server-09c5c41ef07eb52c08d493e646f141d9181a2e6a.tar.gz
merge default into bug23643
-rw-r--r--docs/rabbitmqctl.1.xml34
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl8
-rw-r--r--include/rabbit_auth_backend_spec.hrl46
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--scripts/rabbitmq-multi.bat2
-rw-r--r--scripts/rabbitmq-server.bat2
-rw-r--r--scripts/rabbitmqctl.bat2
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_access_control.erl434
-rw-r--r--src/rabbit_amqqueue_process.erl150
-rw-r--r--src/rabbit_auth_backend.erl76
-rw-r--r--src/rabbit_auth_backend_internal.erl347
-rw-r--r--src/rabbit_auth_mechanism_external.erl2
-rw-r--r--src/rabbit_channel.erl230
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_connection_sup.erl1
-rw-r--r--src/rabbit_control.erl33
-rw-r--r--src/rabbit_misc.erl22
-rw-r--r--src/rabbit_mnesia.erl6
-rw-r--r--src/rabbit_msg_store.erl242
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_tests.erl30
-rw-r--r--src/rabbit_types.erl21
-rw-r--r--src/rabbit_upgrade_functions.erl17
-rw-r--r--src/rabbit_variable_queue.erl108
-rw-r--r--src/rabbit_vhost.erl122
29 files changed, 1269 insertions, 759 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 01ddd4c1..2152cab3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -398,7 +398,12 @@
<refsect2>
<title>User management</title>
-
+ <para>
+ Note that <command>rabbitmqctl</command> manages the RabbitMQ
+ internal user database. Users from any alternative
+ authentication backend will not be visible
+ to <command>rabbitmqctl</command>.
+ </para>
<variablelist>
<varlistentry>
<term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term>
@@ -544,7 +549,12 @@
<refsect2>
<title>Access control</title>
-
+ <para>
+ Note that <command>rabbitmqctl</command> manages the RabbitMQ
+ internal user database. Permissions for users from any
+ alternative authorisation backend will not be visible
+ to <command>rabbitmqctl</command>.
+ </para>
<variablelist>
<varlistentry>
<term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
@@ -1000,6 +1010,26 @@
connection is secured with SSL.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>ssl_protocol</term>
+ <listitem><para>SSL protocol
+ (e.g. tlsv1)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_key_exchange</term>
+ <listitem><para>SSL key exchange algorithm
+ (e.g. rsa)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_cipher</term>
+ <listitem><para>SSL cipher algorithm
+ (e.g. aes_256_cbc)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_hash</term>
+ <listitem><para>SSL hash function
+ (e.g. sha)</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>peer_cert_subject</term>
<listitem><para>The subject of the peer's SSL
certificate, in RFC4514 form.</para></listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 25d630c0..5ed872b6 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -32,4 +32,5 @@
{server_properties, []},
{collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
+ {auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 8c8e12a1..81c3996b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -29,7 +29,13 @@
%% Contributor(s): ______________________________________.
%%
--record(user, {username, password_hash, is_admin}).
+-record(user, {username,
+ is_admin,
+ auth_backend, %% Module this user came from
+ impl %% Scratch space for that module
+ }).
+
+-record(internal_user, {username, password_hash, is_admin}).
-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
new file mode 100644
index 00000000..a96c18d8
--- /dev/null
+++ b/include/rabbit_auth_backend_spec.hrl
@@ -0,0 +1,46 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+
+-spec(check_user_login/2 :: (rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}).
+-spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_access_control:vhost_permission_atom()) ->
+ boolean() | {'error', any()}).
+-spec(check_resource_access/3 :: (rabbit_types:user(),
+ rabbit_types:r(atom()),
+ rabbit_access_control:permission_atom()) ->
+ boolean() | {'error', any()}).
+-endif.
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f67c6f46..6fa34ccc 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -58,7 +58,7 @@
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4f8c8b4..ec61dc99 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
-pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 52a250c6..ec5b4d85 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,7 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--sname rabbitmqprelaunch%RANDOM% ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 563b9e58..4ffde73f 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2ebfdecf..954e289b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -458,16 +458,16 @@ insert_default_data() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
- ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_vhost:add(DefaultVHost),
+ ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
case DefaultAdmin of
- true -> rabbit_access_control:set_admin(DefaultUser);
+ true -> rabbit_auth_backend_internal:set_admin(DefaultUser);
_ -> ok
end,
- ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
ok.
rotate_logs(File, Suffix, Handler) ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 51adbac8..02a65442 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -30,70 +30,35 @@
%%
-module(rabbit_access_control).
--include_lib("stdlib/include/qlc.hrl").
+
-include("rabbit.hrl").
--export([user_pass_login/2, check_user_pass_login/2, make_salt/0,
- check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, set_admin/1,
- clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
--export([change_password_hash/2, hash_password/1]).
--export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
- list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
- list_user_vhost_permissions/2]).
+-export([user_pass_login/2, check_user_pass_login/2, check_user_login/2,
+ check_vhost_access/2, check_resource_access/3, list_vhosts/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([username/0, password/0, password_hash/0]).
+-export_type([permission_atom/0, vhost_permission_atom/0]).
-type(permission_atom() :: 'configure' | 'read' | 'write').
--type(username() :: binary()).
--type(password() :: binary()).
--type(password_hash() :: binary()).
--type(regexp() :: binary()).
+-type(vhost_permission_atom() :: 'read' | 'write').
+
-spec(user_pass_login/2 ::
- (username(), password())
+ (rabbit_types:username(), rabbit_types:password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
- (username(), password())
+ (rabbit_types:username(), rabbit_types:password())
-> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
--spec(make_salt/0 :: () -> binary()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
- (username(), rabbit_types:r(atom()), permission_atom())
+ (rabbit_types:user(), rabbit_types:r(atom()), permission_atom())
-> 'ok' | rabbit_types:channel_exit()).
--spec(add_user/2 :: (username(), password()) -> 'ok').
--spec(delete_user/1 :: (username()) -> 'ok').
--spec(change_password/2 :: (username(), password()) -> 'ok').
--spec(clear_password/1 :: (username()) -> 'ok').
--spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
--spec(hash_password/1 :: (password()) -> password_hash()).
--spec(set_admin/1 :: (username()) -> 'ok').
--spec(clear_admin/1 :: (username()) -> 'ok').
--spec(list_users/0 :: () -> [{username(), boolean()}]).
--spec(lookup_user/1 ::
- (username()) -> rabbit_types:ok(rabbit_types:user())
- | rabbit_types:error('not_found')).
--spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
--spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
- regexp(), regexp()) -> 'ok').
--spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
--spec(list_permissions/0 ::
- () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
--spec(list_user_permissions/1 ::
- (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
--spec(list_user_vhost_permissions/2 ::
- (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]).
+-spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom())
+ -> [rabbit_types:vhost()]).
-endif.
@@ -109,314 +74,79 @@ user_pass_login(User, Pass) ->
U
end.
-check_user_pass_login(Username, Pass) ->
- Refused = {refused, "user '~s' - invalid credentials", [Username]},
- case lookup_user(Username) of
- {ok, User} ->
- case check_password(Pass, User#user.password_hash) of
- true -> {ok, User};
- _ -> Refused
- end;
- {error, not_found} ->
- Refused
- end.
-
-internal_lookup_vhost_access(Username, VHostPath) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] -> not_found;
- [R] -> {ok, R}
- end
- end).
-
-check_vhost_access(#user{username = Username}, VHostPath) ->
+check_user_pass_login(Username, Password) ->
+ check_user_login(Username, [{password, Password}]).
+
+check_user_login(Username, AuthProps) ->
+ {ok, Modules} = application:get_env(rabbit, auth_backends),
+ lists:foldl(
+ fun(Module, {refused, _, _}) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} ->
+ {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else ->
+ Else
+ end;
+ (_, {ok, User}) ->
+ {ok, User}
+ end, {refused, "No modules checked '~s'", [Username]}, Modules).
+
+check_vhost_access(User = #user{ username = Username,
+ auth_backend = Module }, VHostPath) ->
?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
- case internal_lookup_vhost_access(Username, VHostPath) of
- {ok, _R} ->
- ok;
- not_found ->
- rabbit_misc:protocol_error(
- access_refused, "access to vhost '~s' refused for user '~s'",
- [VHostPath, Username])
- end.
-
-permission_index(configure) -> #permission.configure;
-permission_index(write) -> #permission.write;
-permission_index(read) -> #permission.read.
-
-check_resource_access(Username,
- R = #resource{kind = exchange, name = <<"">>},
+ check_access(
+ fun() ->
+ rabbit_vhost:exists(VHostPath) andalso
+ Module:check_vhost_access(User, VHostPath, write)
+ end,
+ "~s failed checking vhost access to ~s for ~s: ~p~n",
+ [Module, VHostPath, Username],
+ "access to vhost '~s' refused for user '~s'",
+ [VHostPath, Username]).
+
+check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
- check_resource_access(Username,
- R#resource{name = <<"amq.default">>},
+ check_resource_access(User, R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(Username,
- R = #resource{virtual_host = VHostPath, name = Name},
- Permission) ->
- Res = case mnesia:dirty_read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}}) of
- [] ->
- false;
- [#user_permission{permission = P}] ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
- end
- end,
- if Res -> ok;
- true -> rabbit_misc:protocol_error(
- access_refused, "access to ~s refused for user '~s'",
- [rabbit_misc:rs(R), Username])
- end.
-
-add_user(Username, Password) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(rabbit_user,
- #user{username = Username,
- password_hash =
- hash_password(Password),
- is_admin = false},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end),
- rabbit_log:info("Created user ~p~n", [Username]),
- R.
-
-delete_user(Username) ->
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)),
- rabbit_log:info("Deleted user ~p~n", [Username]),
- R.
-
-change_password(Username, Password) ->
- change_password_hash(Username, hash_password(Password)).
-
-clear_password(Username) ->
- change_password_hash(Username, <<"">>).
-
-change_password_hash(Username, PasswordHash) ->
- R = update_user(Username, fun(User) ->
- User#user{ password_hash = PasswordHash }
- end),
- rabbit_log:info("Changed password for user ~p~n", [Username]),
- R.
-
-hash_password(Cleartext) ->
- Salt = make_salt(),
- Hash = salted_md5(Salt, Cleartext),
- <<Salt/binary, Hash/binary>>.
-
-check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
- Hash =:= salted_md5(Salt, Cleartext).
-
-make_salt() ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- Salt = random:uniform(16#ffffffff),
- <<Salt:32>>.
-
-salted_md5(Salt, Cleartext) ->
- Salted = <<Salt/binary, Cleartext/binary>>,
- erlang:md5(Salted).
-
-set_admin(Username) ->
- set_admin(Username, true).
-
-clear_admin(Username) ->
- set_admin(Username, false).
-
-set_admin(Username, IsAdmin) ->
- R = update_user(Username, fun(User) ->
- User#user{is_admin = IsAdmin}
- end),
- rabbit_log:info("Set user admin flag for user ~p to ~p~n",
- [Username, IsAdmin]),
- R.
-
-update_user(Username, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- {ok, User} = lookup_user(Username),
- ok = mnesia:write(rabbit_user, Fun(User), write)
- end)).
-
-list_users() ->
- [{Username, IsAdmin} ||
- #user{username = Username, is_admin = IsAdmin} <-
- mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})].
-
-lookup_user(Username) ->
- rabbit_misc:dirty_read({rabbit_user, Username}).
-
-add_vhost(VHostPath) ->
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
- end
- end),
- rabbit_log:info("Added vhost ~p~n", [VHostPath]),
- R.
-
-delete_vhost(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () ->
- ok = internal_delete_vhost(VHostPath)
- end)),
- rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
- R.
-
-internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
- ok = clear_permissions(Username, VHostPath)
- end,
- list_vhost_permissions(VHostPath)),
- ok = mnesia:delete({rabbit_vhost, VHostPath}),
- ok.
-
-vhost_exists(VHostPath) ->
- mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
-
-list_vhosts() ->
- mnesia:dirty_all_keys(rabbit_vhost).
-
-validate_regexp(RegexpBin) ->
- Regexp = binary_to_list(RegexpBin),
- case re:compile(Regexp) of
- {ok, _} -> ok;
- {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+check_resource_access(User = #user{username = Username, auth_backend = Module},
+ Resource, Permission) ->
+ check_access(
+ fun() -> Module:check_resource_access(User, Resource, Permission) end,
+ "~s failed checking resource access to ~p for ~s: ~p~n",
+ [Module, Resource, Username],
+ "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(Resource), Username]).
+
+check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
+ Allow = case Fun() of
+ {error, _} = E ->
+ rabbit_log:error(ErrStr, ErrArgs ++ [E]),
+ false;
+ Else ->
+ Else
+ end,
+ case Allow of
+ true ->
+ ok;
+ false ->
+ rabbit_misc:protocol_error(access_refused, RefStr, RefArgs)
end.
-set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () -> ok = mnesia:write(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = #permission{
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
- write)
- end)).
-
-
-clear_permissions(Username, VHostPath) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () ->
- ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}})
- end)).
-
-list_permissions() ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(match_user_vhost('_', '_'))].
-
-list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_vhost(
- VHostPath, match_user_vhost('_', VHostPath)))].
-
-list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user(
- Username, match_user_vhost(Username, '_')))].
-
-list_user_vhost_permissions(Username, VHostPath) ->
- [{ConfigurePerm, WritePerm, ReadPerm} ||
- {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
- list_permissions(rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- match_user_vhost(Username, VHostPath)))].
-
-list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- #user_permission{user_vhost = #user_vhost{username = Username,
- virtual_host = VHostPath},
- permission = #permission{ configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
-
-match_user_vhost(Username, VHostPath) ->
- fun () -> mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = '_'},
- read)
- end.
+%% Permission = write -> log in
+%% Permission = read -> learn of the existence of (only relevant for
+%% management plugin)
+list_vhosts(User = #user{username = Username, auth_backend = Module},
+ Permission) ->
+ lists:filter(
+ fun(VHost) ->
+ case Module:check_vhost_access(User, VHost, Permission) of
+ {error, _} = E ->
+ rabbit_log:warning("~w failed checking vhost access "
+ "to ~s for ~s: ~p~n",
+ [Module, VHost, Username, E]),
+ false;
+ Else ->
+ Else
+ end
+ end, rabbit_vhost:list()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 981dd31d..38b83117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -427,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {[], GTC}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
+ {ChPid, MsgSeqNos} <- group_confirms_by_channel(
+ CMs1, [{Ch, [MsgSeqNo]}])];
+ [] ->
+ ok
end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+ State#q{guid_to_channel = GTC1}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- State;
+ {no_confirm, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
@@ -451,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
- State.
-
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
+ {no_confirm, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -473,12 +481,12 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ, q = Q}) ->
- NeedsConfirming = Message#basic_message.is_persistent andalso
- Q#amqqueue.durable,
- case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+ {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
+ case {NeedsConfirming, MsgSeqNo} of
+ {_, undefined} -> ok;
+ {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ {confirm, _} -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -490,31 +498,37 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = NeedsConfirming},
+ needs_confirming = (NeedsConfirming =:= confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, NeedsConfirming, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ {NeedsConfirming,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
record_current_channel_tx(ChPid, Txn),
{true,
+ NeedsConfirming,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, State1} ->
+ {true, _, State1} ->
{true, State1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}} ->
+ #delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming = (MsgSeqNo =/= undefined)},
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
@@ -771,18 +785,19 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -823,7 +838,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -838,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
-handle_call({deliver, Delivery}, _From, State) ->
- %% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- reply(Delivered, NewState);
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode. Reply asap.
+ gen_server2:reply(From, true),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -881,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -1019,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1029,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1038,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
new file mode 100644
index 00000000..0dc8e61b
--- /dev/null
+++ b/src/rabbit_auth_backend.erl
@@ -0,0 +1,76 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description proplist as with auth mechanisms,
+ %% exchanges. Currently unused.
+ {description, 0},
+
+ %% Check a user can log in, given a username and a proplist of
+ %% authentication information (e.g. [{password, Password}]).
+ %%
+ %% Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {check_user_login, 2},
+
+ %% Given #user, vhost path and permission, can a user access a vhost?
+ %% Permission is read - learn of the existence of (only relevant for
+ %% management plugin)
+ %% or write - log in
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_vhost_access, 3},
+
+ %% Given #user, resource and permission, can a user access a resource?
+ %%
+ %% Possible responses:
+ %% true
+ %% false
+ %% {error, Error}
+ %% Something went wrong. Log and die.
+ {check_resource_access, 3}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
new file mode 100644
index 00000000..233e2b90
--- /dev/null
+++ b/src/rabbit_auth_backend_internal.erl
@@ -0,0 +1,347 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_backend_internal).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_backend).
+
+-export([description/0]).
+-export([check_user_login/2, check_vhost_access/3, check_resource_access/3]).
+
+-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
+ clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
+-export([make_salt/0, check_password/2, change_password_hash/2,
+ hash_password/1]).
+-export([set_permissions/5, clear_permissions/2,
+ list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
+ list_user_vhost_permissions/2]).
+
+-include("rabbit_auth_backend_spec.hrl").
+
+-ifdef(use_specs).
+
+-type(regexp() :: binary()).
+
+-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok').
+-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok').
+-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password())
+ -> 'ok').
+-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok').
+-spec(make_salt/0 :: () -> binary()).
+-spec(check_password/2 :: (rabbit_types:password(),
+ rabbit_types:password_hash()) -> boolean()).
+-spec(change_password_hash/2 :: (rabbit_types:username(),
+ rabbit_types:password_hash()) -> 'ok').
+-spec(hash_password/1 :: (rabbit_types:password())
+ -> rabbit_types:password_hash()).
+-spec(set_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
+-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
+-spec(lookup_user/1 :: (rabbit_types:username())
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
+-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
+ -> 'ok').
+-spec(list_permissions/0 ::
+ () -> [{rabbit_types:username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_vhost_permissions/1 ::
+ (rabbit_types:vhost()) -> [{rabbit_types:username(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (rabbit_types:username()) -> [{rabbit_types:vhost(),
+ regexp(), regexp(), regexp()}]).
+-spec(list_user_vhost_permissions/2 ::
+ (rabbit_types:username(), rabbit_types:vhost())
+ -> [{regexp(), regexp(), regexp()}]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Implementation of rabbit_auth_backend
+
+description() ->
+ [{name, <<"Internal">>},
+ {description, <<"Internal user / password database">>}].
+
+check_user_login(Username, []) ->
+ internal_check_user_login(Username, fun(_) -> true end);
+check_user_login(Username, [{password, Password}]) ->
+ internal_check_user_login(
+ Username,
+ fun(#internal_user{password_hash = Hash}) ->
+ check_password(Password, Hash)
+ end);
+check_user_login(Username, AuthProps) ->
+ exit({unknown_auth_props, Username, AuthProps}).
+
+internal_check_user_login(Username, Fun) ->
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
+ case lookup_user(Username) of
+ {ok, User = #internal_user{is_admin = IsAdmin}} ->
+ case Fun(User) of
+ true -> {ok, #user{username = Username,
+ is_admin = IsAdmin,
+ auth_backend = ?MODULE,
+ impl = User}};
+ _ -> Refused
+ end;
+ {error, not_found} ->
+ Refused
+ end.
+
+check_vhost_access(#user{is_admin = true}, _VHostPath, read) ->
+ true;
+
+check_vhost_access(#user{username = Username}, VHostPath, _) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end
+ end).
+
+check_resource_access(#user{username = Username},
+ #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
+ end.
+
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
+%%----------------------------------------------------------------------------
+%% Manipulation of the user database
+
+add_user(Username, Password) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ is_admin = false},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end),
+ rabbit_log:info("Created user ~p~n", [Username]),
+ R.
+
+delete_user(Username) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)),
+ rabbit_log:info("Deleted user ~p~n", [Username]),
+ R.
+
+change_password(Username, Password) ->
+ change_password_hash(Username, hash_password(Password)).
+
+clear_password(Username) ->
+ change_password_hash(Username, <<"">>).
+
+change_password_hash(Username, PasswordHash) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash }
+ end),
+ rabbit_log:info("Changed password for user ~p~n", [Username]),
+ R.
+
+hash_password(Cleartext) ->
+ Salt = make_salt(),
+ Hash = salted_md5(Salt, Cleartext),
+ <<Salt/binary, Hash/binary>>.
+
+check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) ->
+ Hash =:= salted_md5(Salt, Cleartext).
+
+make_salt() ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ Salt = random:uniform(16#ffffffff),
+ <<Salt:32>>.
+
+salted_md5(Salt, Cleartext) ->
+ Salted = <<Salt/binary, Cleartext/binary>>,
+ erlang:md5(Salted).
+
+set_admin(Username) ->
+ set_admin(Username, true).
+
+clear_admin(Username) ->
+ set_admin(Username, false).
+
+set_admin(Username, IsAdmin) ->
+ R = update_user(Username, fun(User) ->
+ User#internal_user{is_admin = IsAdmin}
+ end),
+ rabbit_log:info("Set user admin flag for user ~p to ~p~n",
+ [Username, IsAdmin]),
+ R.
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+list_users() ->
+ [{Username, IsAdmin} ||
+ #internal_user{username = Username, is_admin = IsAdmin} <-
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
+
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
+
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
+ end)).
+
+
+clear_permissions(Username, VHostPath) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () ->
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
+ end)).
+
+list_permissions() ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(match_user_vhost('_', '_'))].
+
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_vhost:with(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ [{ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ match_user_vhost(Username, VHostPath)))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
index 6572f786..1c4e5c15 100644
--- a/src/rabbit_auth_mechanism_external.erl
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -81,7 +81,7 @@ handle_response(_Response, #state{username = Username}) ->
{refused, _, _} = E ->
E;
_ ->
- case rabbit_access_control:lookup_user(Username) of
+ case rabbit_access_control:check_user_login(Username, []) of
{ok, User} ->
{ok, User};
{error, not_found} ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index edafd52d..930e48e6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -47,10 +47,9 @@
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host, most_recently_declared_queue,
+ user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_CONFIRMS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -83,7 +80,7 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ (channel_number(), pid(), pid(), rabbit_types:user(),
rabbit_types:vhost(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
@@ -97,8 +94,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
--spec(flush_confirms/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -111,9 +107,9 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
@@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
-flush_confirms(Pid) ->
- gen_server2:cast(Pid, flush_confirms).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -168,7 +161,7 @@ emit_stats(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -184,7 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
next_tag = 1,
uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
- username = Username,
+ user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
@@ -192,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
- publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
+ publish_seqno = 1,
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_confirms, State) ->
- {noreply, internal_flush_confirms(State)};
-
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, confirm(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> confirm(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -371,7 +358,7 @@ return_queue_declare_ok(#resource{name = ActualName},
message_count = MessageCount,
consumer_count = ConsumerCount}).
-check_resource_access(Username, Resource, Perm) ->
+check_resource_access(User, Resource, Perm) ->
V = {Resource, Perm},
Cache = case get(permission_cache) of
undefined -> [];
@@ -381,7 +368,7 @@ check_resource_access(Username, Resource, Perm) ->
case lists:member(V, Cache) of
true -> lists:delete(V, Cache);
false -> ok = rabbit_access_control:check_resource_access(
- Username, Resource, Perm),
+ User, Resource, Perm),
lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
end,
put(permission_cache, [V | CacheTail]),
@@ -391,14 +378,25 @@ clear_permission_cache() ->
erase(permission_cache),
ok.
-check_configure_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, configure).
+check_configure_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, configure).
-check_write_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, write).
+check_write_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, write).
-check_read_permitted(Resource, #ch{username = Username}) ->
- check_resource_access(Username, Resource, read).
+check_read_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, read).
+
+check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Username},
+ #ch{user = #user{username = Username}}) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Claimed},
+ #ch{user = #user{username = Actual}}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "user_id property set to '~s' but "
+ "authenticated user was '~s'", [Claimed, Actual]).
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -473,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-confirm(undefined, _QPid, State) ->
+confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
+ MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
+ MS = gb_sets:from_list(MsgSeqNos),
+ QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
+ send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS),
+ queues_for_msg = QFM1});
+confirm(MsgSeqNos, QPid, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ case gb_sets:is_element(MsgSeqNo, UC0) of
+ false -> {DMs, State0};
+ true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end
+ end, {[], State}, MsgSeqNos),
+ send_confirms(DoneMessages, State1).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -551,6 +537,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ check_user_id_header(DecodedContent#content.properties, State),
IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1}
= case ConfirmEnabled of
@@ -998,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1241,12 +1218,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1260,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
- ?MODULE, flush_confirms, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+send_confirms([], State) ->
State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SCs = lists:usort(Cs),
+ CutOff = case gb_sets:is_empty(UC) of
+ true -> lists:last(SCs) + 1;
+ false -> gb_sets:smallest(UC)
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
+ ok = lists:foldl(fun(T, ok) ->
+ rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = T})
+ end, ok, Ss),
+ State.
-terminate(State) ->
- stop_confirm_timer(State),
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
@@ -1309,7 +1267,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{username = Username}) -> Username;
+i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 747d23d3..9f50176d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -48,7 +48,7 @@
-type(start_link_args() ::
{rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -56,7 +56,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -69,7 +69,7 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, Username, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index ff3995b5..a6b1f7fa 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -78,4 +78,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index df55d961..8a3275bc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -201,48 +201,48 @@ action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
- call(Node, {rabbit_access_control, add_user, Args});
+ call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
- call(Node, {rabbit_access_control, delete_user, Args});
+ call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
- call(Node, {rabbit_access_control, change_password, Args});
+ call(Node, {rabbit_auth_backend_internal, change_password, Args});
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
Inform("Clearing password for user ~p", [Username]),
- call(Node, {rabbit_access_control, clear_password, Args});
+ call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_admin, Node, [Username], _Opts, Inform) ->
Inform("Setting administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, set_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, set_admin, [Username]});
action(clear_admin, Node, [Username], _Opts, Inform) ->
Inform("Clearing administrative status for user ~p", [Username]),
- call(Node, {rabbit_access_control, clear_admin, [Username]});
+ call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]});
action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
- display_list(call(Node, {rabbit_access_control, list_users, []}));
+ display_list(call(Node, {rabbit_auth_backend_internal, list_users, []}));
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
- call(Node, {rabbit_access_control, add_vhost, Args});
+ call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
- call(Node, {rabbit_access_control, delete_vhost, Args});
+ call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
- display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
+ display_list(call(Node, {rabbit_vhost, list, []}));
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_permissions,
- Args}));
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_user_permissions, Args}));
action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
@@ -296,19 +296,20 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, set_permissions,
+ call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
+ call(Node, {rabbit_auth_backend_internal, clear_permissions,
+ [Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
- display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
- [VHost]})).
+ display_list(call(Node, {rabbit_auth_backend_internal,
+ list_vhost_permissions, [VHost]})).
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 06ba319b..15ba787a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,7 +46,7 @@
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
--export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
+-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
@@ -72,7 +72,7 @@
-ifdef(use_specs).
--export_type([resource_name/0]).
+-export_type([resource_name/0, thunk/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -137,10 +137,9 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A).
--spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A).
+-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
- (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
+ (rabbit_types:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
@@ -366,19 +365,8 @@ with_user(Username, Thunk) ->
end
end.
-with_vhost(VHostPath, Thunk) ->
- fun () ->
- case mnesia:read({rabbit_vhost, VHostPath}) of
- [] ->
- mnesia:abort({no_such_vhost, VHostPath});
- [_V] ->
- Thunk()
- end
- end.
-
with_user_and_vhost(Username, VHostPath, Thunk) ->
- with_user(Username, with_vhost(VHostPath, Thunk)).
-
+ with_user(Username, rabbit_vhost:with(VHostPath, Thunk)).
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 11f5e410..38cc82a6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -163,10 +163,10 @@ nodes_of_type(Type) ->
table_definitions() ->
[{rabbit_user,
- [{record_name, user},
- {attributes, record_info(fields, user)},
+ [{record_name, internal_user},
+ {attributes, record_info(fields, internal_user)},
{disc_copies, [node()]},
- {match, #user{_='_'}}]},
+ {match, #internal_user{_='_'}}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2e1834c7..deb62eb2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,6 +81,7 @@
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
+ dying_clients, %% set of dying clients
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
@@ -306,6 +307,17 @@
%% sure that reads are not attempted from files which are in the
%% process of being garbage collected.
%%
+%% When a message is removed, its reference count is decremented. Even
+%% if the reference count becomes 0, its entry is not removed. This is
+%% because in the event of the same message being sent to several
+%% different queues, there is the possibility of one queue writing and
+%% removing the message before other queues write it at all. Thus
+%% accomodating 0-reference counts allows us to avoid unnecessary
+%% writes here. Of course, there are complications: the file to which
+%% the message has already been written could be locked pending
+%% deletion or GC, which means we have to rewrite the message as the
+%% original copy will now be lost.
+%%
%% The server automatically defers reads, removes and contains calls
%% that occur which refer to files which are currently being
%% GC'd. Contains calls are only deferred in order to ensure they do
@@ -323,6 +335,55 @@
%% heavily overloaded, clients can still write and read messages with
%% very low latency and not block at all.
%%
+%% Clients of the msg_store are required to register before using the
+%% msg_store. This provides them with the necessary client-side state
+%% to allow them to directly access the various caches and files. When
+%% they terminate, they should deregister. They can do this by calling
+%% either client_terminate/1 or client_delete_and_terminate/1. The
+%% differences are: (a) client_terminate is synchronous. As a result,
+%% if the msg_store is badly overloaded and has lots of in-flight
+%% writes and removes to process, this will take some time to
+%% return. However, once it does return, you can be sure that all the
+%% actions you've issued to the msg_store have been processed. (b) Not
+%% only is client_delete_and_terminate/1 asynchronous, but it also
+%% permits writes and subsequent removes from the current
+%% (terminating) client which are still in flight to be safely
+%% ignored. Thus from the point of view of the msg_store itself, and
+%% all from the same client:
+%%
+%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
+%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
+%%
+%% The client obviously sent T after all the other messages (up to
+%% W4), but because the msg_store prioritises messages, the T can be
+%% promoted and thus received early.
+%%
+%% Thus at the point of the msg_store receiving T, we have messages 1
+%% and 2 with a refcount of 1. After T, W3 will be ignored because
+%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
+%% ignored because the messages that they refer to were already known
+%% to the msg_store prior to T. However, it can be a little more
+%% complex: after the first R2, the refcount of msg 2 is 0. At that
+%% point, if a GC occurs or file deletion, msg 2 could vanish, which
+%% would then mean that the subsequent W2 and R2 are then ignored.
+%%
+%% The use case then for client_delete_and_terminate/1 is if the
+%% client wishes to remove everything it's written to the msg_store:
+%% it issues removes for all messages it's written and not removed,
+%% and then calls client_delete_and_terminate/1. At that point, any
+%% in-flight writes (and subsequent removes) can be ignored, but
+%% removes and writes for messages the msg_store already knows about
+%% will continue to be processed normally (which will normally just
+%% involve modifying the reference count, which is fast). Thus we save
+%% disk bandwidth for writes which are going to be immediately removed
+%% again by the the terminating client.
+%%
+%% We use a separate set to keep track of the dying clients in order
+%% to keep that set, which is inspected on every write and remove, as
+%% small as possible. Inspecting client_refs - the set of all clients
+%% - would degrade performance with many healthy clients and few, if
+%% any, dying clients, which is the typical case.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
+ ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
@@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients = sets:new(),
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) ->
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
+ {client_dying, _Pid} -> 7;
_ -> 0
end.
@@ -681,15 +745,22 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1).
+handle_cast({client_dying, CRef},
+ State = #msstate { dying_clients = DyingClients }) ->
+ DyingClients1 = sets:add_element(CRef, DyingClients),
+ write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+
handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate {
- client_refs = sets:del_element(CRef, ClientRefs) });
+ State = #msstate { client_refs = ClientRefs,
+ dying_clients = DyingClients }) ->
+ State1 = clear_client_callback(
+ CRef, State #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs),
+ dying_clients = sets:del_element(CRef, DyingClients) }),
+ noreply(remove_message(CRef, CRef, State1));
handle_cast({write, CRef, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
+ State = #msstate { file_summary_ets = FileSummaryEts,
current_file = CurFile,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
@@ -705,41 +776,47 @@ handle_cast({write, CRef, Guid},
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
- case index_lookup(Guid, State1) of
- not_found ->
+ case should_mask_action(CRef, Guid, State1) of
+ {true, _Location} ->
+ noreply(State1);
+ {false, not_found} ->
write_message(Guid, Msg, State1);
- #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State1),
write_message(Guid, Msg, State1);
- [#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State1),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, TotalSize}]),
- noreply(State1 #msstate {
- sum_valid_data = SumValid + TotalSize })
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently,
+ %% we'll have to write a new copy, which will then
+ %% be younger, so ignore this write.
+ noreply(State1);
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State))
end;
- #msg_location { ref_count = RefCount, file = File } ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
ok = index_update_ref_count(Guid, RefCount + 1, State1),
CTG2 = case {dict:find(CRef, CODC), File} of
{{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
+ written),
CTG;
_ -> CTG1
end,
- noreply(State #msstate { cref_to_guids = CTG2 })
+ noreply(State1 #msstate { cref_to_guids = CTG2 })
end;
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, State2) end,
+ fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
- noreply(maybe_compact(State2));
+ noreply(maybe_compact(
+ client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -861,9 +938,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
CGs = dict:fold(fun (CRef, Guids, NS) ->
case gb_sets:is_empty(Guids) of
@@ -871,14 +948,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, Guids} | NS]
end
end, [], CTG),
- if Syncs =:= [] andalso CGs =:= [] -> ok;
- true -> file_handle_cache:sync(CurHdl)
+ case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
end,
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [K() || K <- lists:reverse(Syncs)],
+ [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
-
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
@@ -990,34 +1067,43 @@ contains_message(Guid, From,
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } =
- index_lookup_positive_ref_count(Guid, State),
- %% only update field, otherwise bad interaction with concurrent GC
- Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, File, State);
- [#file_summary {}] ->
+remove_message(Guid, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg
+ %% whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }} when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec =
+ fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here
+ %% because there may be further writes in the mailbox
+ %% for the same msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, Guid, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(File, -TotalSize,
+ State))
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
ok = Dec(),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, -TotalSize}]),
- delete_file_if_empty(
- File, State #msstate {
- sum_valid_data = SumValid - TotalSize })
- end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
- State
+ State
+ end
end.
add_to_pending_gc_completion(
@@ -1039,8 +1125,8 @@ run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending_action({remove, Guid}, State) ->
- remove_message(Guid, State).
+run_pending_action({remove, Guid, CRef}, State) ->
+ remove_message(Guid, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1051,15 +1137,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+adjust_valid_total_size(File, Delta, State = #msstate {
+ sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts }) ->
+ [_] = ets:update_counter(FileSummaryEts, File,
+ [{#file_summary.valid_total_size, Delta}]),
+ State #msstate { sum_valid_data = SumValid + Delta }.
+
orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, ActionTaken,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, ActionTaken),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
@@ -1073,6 +1166,29 @@ client_confirm(CRef, Guids,
error -> State
end.
+%% Detect whether the Guid is older or younger than the client's death
+%% msg (if there is one). If the msg is older than the client death
+%% msg, and it has a 0 ref_count we must only alter the ref_count, not
+%% rewrite the msg - rewriting it would make it younger than the death
+%% msg and thus should be ignored. Note that this (correctly) returns
+%% false when testing to remove the death msg itself.
+should_mask_action(CRef, Guid,
+ State = #msstate { dying_clients = DyingClients }) ->
+ case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ {false, Location} ->
+ {false, Location};
+ {true, not_found} ->
+ {true, not_found};
+ {true, #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } = Location} ->
+ #msg_location { file = DeathFile, offset = DeathOffset } =
+ index_lookup(CRef, State),
+ {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
+ {true, _} -> true;
+ {false, 0} -> false_if_increment;
+ {false, _} -> false
+ end, Location}
+ end.
%%----------------------------------------------------------------------------
%% file helper functions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 89954b06..c6a083bb 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,7 +32,7 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([is_ssl/1, controlling_process/2, getstat/2,
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
async_recv/3, port_command/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -50,6 +50,9 @@
-type(socket() :: port() | #ssl_socket{}).
-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(ssl_info/1 :: (socket())
+ -> 'nossl' | ok_val_or_error(
+ {atom(), {atom(), atom(), atom()}})).
-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(getstat/2 ::
(socket(), [stat_option()])
@@ -77,6 +80,11 @@
is_ssl(Sock) -> ?IS_SSL(Sock).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_info(Sock#ssl_socket.ssl);
+ssl_info(_Sock) ->
+ nossl.
+
controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef..2162104f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 74e5fc77..9d44cabd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -65,6 +65,8 @@
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism,
+ ssl_protocol, ssl_key_exchange,
+ ssl_cipher, ssl_hash,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -742,17 +744,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- SendFun =
- fun() ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- catch rabbit_net:send(Sock, Frame)
- end,
-
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun =
- fun() ->
- Parent ! timeout
- end,
+ ReceiveFun = fun() -> Parent ! timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -879,6 +874,14 @@ i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(ssl, #v1{sock = Sock}) ->
rabbit_net:is_ssl(Sock);
+i(ssl_protocol, #v1{sock = Sock}) ->
+ ssl_info(fun ({P, _}) -> P end, Sock);
+i(ssl_key_exchange, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
+i(ssl_cipher, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
+i(ssl_hash, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -929,6 +932,13 @@ socket_info(Get, Select) ->
{error, _} -> ''
end.
+ssl_info(F, Sock) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Info} -> F(Info)
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
@@ -943,12 +953,12 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
frame_max = FrameMax,
- user = #user{username = Username},
+ user = User,
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), Username, VHost, Collector}),
+ self(), User, VHost, Collector}),
erlang:monitor(process, ChPid),
put({channel, Channel}, {ChPid, AState}),
put({ch_pid, ChPid}, Channel),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index eca748a9..d913092c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1030,7 +1030,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self(),
+ user(<<"user">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1090,7 +1090,7 @@ test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
{ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- <<"guest">>, <<"/">>, self(),
+ user(<<"guest">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -1098,6 +1098,13 @@ test_spawn(Receiver) ->
end,
{Writer, Ch}.
+user(Username) ->
+ #user{username = Username,
+ is_admin = true,
+ auth_backend = rabbit_auth_backend_internal,
+ impl = #internal_user{username = Username,
+ is_admin = true}}.
+
test_statistics_receiver(Pid) ->
receive
shutdown ->
@@ -1689,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
- {A, B} =
+ {A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
@@ -1698,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:write(Guid, Guid, MSCState),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
end, {Qi, []}, SeqIds),
+ %% do this just to force all of the publishes through to the msg_store:
+ true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
@@ -1881,7 +1890,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1983,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1993,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2027,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2057,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2074,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2105,7 +2114,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2167,3 +2176,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 548014be..70d18d7a 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,8 +42,9 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
- connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
- ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
+ connection/0, protocol/0, user/0, internal_user/0,
+ username/0, password/0, password_hash/0, ok/1, error/1,
+ ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
connection_exit/0]).
-type(channel_exit() :: no_return()).
@@ -151,9 +152,19 @@
-type(protocol() :: rabbit_framing:protocol()).
-type(user() ::
- #user{username :: rabbit_access_control:username(),
- password_hash :: rabbit_access_control:password_hash(),
- is_admin :: boolean()}).
+ #user{username :: username(),
+ is_admin :: boolean(),
+ auth_backend :: atom(),
+ impl :: any()}).
+
+-type(internal_user() ::
+ #internal_user{username :: username(),
+ password_hash :: password_hash(),
+ is_admin :: boolean()}).
+
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(password_hash() :: binary()).
-type(ok(A) :: {'ok', A}).
-type(error(A) :: {'error', A}).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7848c848..b5ff2b12 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -28,6 +28,7 @@
-rabbit_upgrade({hash_passwords, []}).
-rabbit_upgrade({add_ip_to_listener, []}).
-rabbit_upgrade({internal_exchanges, []}).
+-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
+-spec(user_to_internal_user/0 :: () -> 'ok').
-endif.
@@ -60,7 +62,7 @@ hash_passwords() ->
mnesia(
rabbit_user,
fun ({user, Username, Password, IsAdmin}) ->
- Hash = rabbit_access_control:hash_password(Password),
+ Hash = rabbit_auth_backend_internal:hash_password(Password),
{user, Username, Hash, IsAdmin}
end,
[username, password_hash, is_admin]).
@@ -85,8 +87,21 @@ internal_exchanges() ->
|| T <- Tables ],
ok.
+user_to_internal_user() ->
+ mnesia(
+ rabbit_user,
+ fun({user, Username, PasswordHash, IsAdmin}) ->
+ {internal_user, Username, PasswordHash, IsAdmin}
+ end,
+ [username, password_hash, is_admin], internal_user).
+
%%--------------------------------------------------------------------
mnesia(TableName, Fun, FieldList) ->
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
+
+mnesia(TableName, Fun, FieldList, NewRecordName) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
+ NewRecordName),
+ ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 565c61e7..b6681c6b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, ActionTaken) ->
+ msgs_written_to_disk(Self, Guids, ActionTaken)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
+ a(reduce_memory_use(
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
@@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -812,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) ->
+ case {OnSync, gb_sets:is_empty(UC)} of
+ {?BLANK_SYNC, true} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1323,7 +1324,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ {PersistentSeqIds, GuidsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1342,9 +1343,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore, AllGuids},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1364,24 +1365,21 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {lists:reverse(AllGuids),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false,
- guid = Guid },
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
- {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
- [Guid | AllGuids]}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1393,6 +1391,14 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { unconfirmed = UC,
+ index_state = IndexState }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> State #vqstate {
+ index_state = rabbit_queue_index:sync(IndexState) }
+ end.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1403,7 +1409,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet) ->
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+
+msgs_written_to_disk(QPid, GuidSet, removed) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
new file mode 100644
index 00000000..f939a3fe
--- /dev/null
+++ b/src/rabbit_vhost.erl
@@ -0,0 +1,122 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_vhost).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-export([add/1, delete/1, exists/1, list/0, with/2]).
+
+-ifdef(use_specs).
+
+-spec(add/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(exists/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(list/0 :: () -> [rabbit_types:vhost()]).
+-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+add(VHostPath) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
+ [] ->
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok;
+ [_] ->
+ mnesia:abort({vhost_already_exists, VHostPath})
+ end
+ end),
+ rabbit_log:info("Added vhost ~p~n", [VHostPath]),
+ R.
+
+delete(VHostPath) ->
+ %%FIXME: We are forced to delete the queues outside the TX below
+ %%because queue deletion involves sending messages to the queue
+ %%process, which in turn results in further mnesia actions and
+ %%eventually the termination of that process.
+ lists:foreach(fun (Q) ->
+ {ok,_} = rabbit_amqqueue:delete(Q, false, false)
+ end,
+ rabbit_amqqueue:list(VHostPath)),
+ R = rabbit_misc:execute_mnesia_transaction(
+ with(VHostPath, fun () ->
+ ok = internal_delete(VHostPath)
+ end)),
+ rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
+ R.
+
+internal_delete(VHostPath) ->
+ lists:foreach(fun (#exchange{name = Name}) ->
+ ok = rabbit_exchange:delete(Name, false)
+ end,
+ rabbit_exchange:list(VHostPath)),
+ lists:foreach(
+ fun ({Username, _, _, _}) ->
+ ok = rabbit_auth_backend_internal:clear_permissions(Username,
+ VHostPath)
+ end,
+ rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
+ ok.
+
+exists(VHostPath) ->
+ mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+
+list() ->
+ mnesia:dirty_all_keys(rabbit_vhost).
+
+with(VHostPath, Thunk) ->
+ fun () ->
+ case mnesia:read({rabbit_vhost, VHostPath}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHostPath});
+ [_V] ->
+ Thunk()
+ end
+ end.