diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-12 14:56:54 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-12 14:56:54 +0000 |
commit | 09c5c41ef07eb52c08d493e646f141d9181a2e6a (patch) | |
tree | e18b564b983c34c9326a6d03d4ecda12ef4511f2 | |
parent | b7e4bb978ddac088e0151037db35c8cf2fd4667e (diff) | |
parent | 5990a0a00e7ec2c79e75c0f62bf89311bee3036c (diff) | |
download | rabbitmq-server-09c5c41ef07eb52c08d493e646f141d9181a2e6a.tar.gz |
merge default into bug23643
29 files changed, 1269 insertions, 759 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 01ddd4c1..2152cab3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -398,7 +398,12 @@ <refsect2> <title>User management</title> - + <para> + Note that <command>rabbitmqctl</command> manages the RabbitMQ + internal user database. Users from any alternative + authentication backend will not be visible + to <command>rabbitmqctl</command>. + </para> <variablelist> <varlistentry> <term><cmdsynopsis><command>add_user</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>password</replaceable></arg></cmdsynopsis></term> @@ -544,7 +549,12 @@ <refsect2> <title>Access control</title> - + <para> + Note that <command>rabbitmqctl</command> manages the RabbitMQ + internal user database. Permissions for users from any + alternative authorisation backend will not be visible + to <command>rabbitmqctl</command>. + </para> <variablelist> <varlistentry> <term><cmdsynopsis><command>add_vhost</command> <arg choice="req"><replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> @@ -1000,6 +1010,26 @@ connection is secured with SSL.</para></listitem> </varlistentry> <varlistentry> + <term>ssl_protocol</term> + <listitem><para>SSL protocol + (e.g. tlsv1)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_key_exchange</term> + <listitem><para>SSL key exchange algorithm + (e.g. rsa)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_cipher</term> + <listitem><para>SSL cipher algorithm + (e.g. aes_256_cbc)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_hash</term> + <listitem><para>SSL hash function + (e.g. sha)</para></listitem> + </varlistentry> + <varlistentry> <term>peer_cert_subject</term> <listitem><para>The subject of the peer's SSL certificate, in RFC4514 form.</para></listitem> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 25d630c0..5ed872b6 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -32,4 +32,5 @@ {server_properties, []}, {collect_statistics, none}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, + {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 8c8e12a1..81c3996b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -29,7 +29,13 @@ %% Contributor(s): ______________________________________. %% --record(user, {username, password_hash, is_admin}). +-record(user, {username, + is_admin, + auth_backend, %% Module this user came from + impl %% Scratch space for that module + }). + +-record(internal_user, {username, password_hash, is_admin}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl new file mode 100644 index 00000000..a96c18d8 --- /dev/null +++ b/include/rabbit_auth_backend_spec.hrl @@ -0,0 +1,46 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). + +-spec(check_user_login/2 :: (rabbit_types:username(), [term()]) -> + {'ok', rabbit_types:user()} | + {'refused', string(), [any()]} | + {'error', any()}). +-spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(), + rabbit_access_control:vhost_permission_atom()) -> + boolean() | {'error', any()}). +-spec(check_resource_access/3 :: (rabbit_types:user(), + rabbit_types:r(atom()), + rabbit_access_control:permission_atom()) -> + boolean() | {'error', any()}). +-endif. diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f67c6f46..6fa34ccc 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -58,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a4f8c8b4..ec61dc99 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( -pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 52a250c6..ec5b4d85 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -118,7 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--sname rabbitmqprelaunch%RANDOM% ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 563b9e58..4ffde73f 100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/rabbit.erl b/src/rabbit.erl index 2ebfdecf..954e289b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -458,16 +458,16 @@ insert_default_data() -> {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), - ok = rabbit_access_control:add_vhost(DefaultVHost), - ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + ok = rabbit_vhost:add(DefaultVHost), + ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), case DefaultAdmin of - true -> rabbit_access_control:set_admin(DefaultUser); + true -> rabbit_auth_backend_internal:set_admin(DefaultUser); _ -> ok end, - ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), + ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), ok. rotate_logs(File, Suffix, Handler) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 51adbac8..02a65442 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -30,70 +30,35 @@ %% -module(rabbit_access_control). --include_lib("stdlib/include/qlc.hrl"). + -include("rabbit.hrl"). --export([user_pass_login/2, check_user_pass_login/2, make_salt/0, - check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). --export([change_password_hash/2, hash_password/1]). --export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, clear_permissions/2, - list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, - list_user_vhost_permissions/2]). +-export([user_pass_login/2, check_user_pass_login/2, check_user_login/2, + check_vhost_access/2, check_resource_access/3, list_vhosts/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([username/0, password/0, password_hash/0]). +-export_type([permission_atom/0, vhost_permission_atom/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). --type(username() :: binary()). --type(password() :: binary()). --type(password_hash() :: binary()). --type(regexp() :: binary()). +-type(vhost_permission_atom() :: 'read' | 'write'). + -spec(user_pass_login/2 :: - (username(), password()) + (rabbit_types:username(), rabbit_types:password()) -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: - (username(), password()) + (rabbit_types:username(), rabbit_types:password()) -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). --spec(make_salt/0 :: () -> binary()). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). -spec(check_resource_access/3 :: - (username(), rabbit_types:r(atom()), permission_atom()) + (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> 'ok' | rabbit_types:channel_exit()). --spec(add_user/2 :: (username(), password()) -> 'ok'). --spec(delete_user/1 :: (username()) -> 'ok'). --spec(change_password/2 :: (username(), password()) -> 'ok'). --spec(clear_password/1 :: (username()) -> 'ok'). --spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). --spec(hash_password/1 :: (password()) -> password_hash()). --spec(set_admin/1 :: (username()) -> 'ok'). --spec(clear_admin/1 :: (username()) -> 'ok'). --spec(list_users/0 :: () -> [{username(), boolean()}]). --spec(lookup_user/1 :: - (username()) -> rabbit_types:ok(rabbit_types:user()) - | rabbit_types:error('not_found')). --spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). --spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). --spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). --spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), - regexp(), regexp()) -> 'ok'). --spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). --spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). --spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). --spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). --spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). +-spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom()) + -> [rabbit_types:vhost()]). -endif. @@ -109,314 +74,79 @@ user_pass_login(User, Pass) -> U end. -check_user_pass_login(Username, Pass) -> - Refused = {refused, "user '~s' - invalid credentials", [Username]}, - case lookup_user(Username) of - {ok, User} -> - case check_password(Pass, User#user.password_hash) of - true -> {ok, User}; - _ -> Refused - end; - {error, not_found} -> - Refused - end. - -internal_lookup_vhost_access(Username, VHostPath) -> - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of - [] -> not_found; - [R] -> {ok, R} - end - end). - -check_vhost_access(#user{username = Username}, VHostPath) -> +check_user_pass_login(Username, Password) -> + check_user_login(Username, [{password, Password}]). + +check_user_login(Username, AuthProps) -> + {ok, Modules} = application:get_env(rabbit, auth_backends), + lists:foldl( + fun(Module, {refused, _, _}) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> + {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> + Else + end; + (_, {ok, User}) -> + {ok, User} + end, {refused, "No modules checked '~s'", [Username]}, Modules). + +check_vhost_access(User = #user{ username = Username, + auth_backend = Module }, VHostPath) -> ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), - case internal_lookup_vhost_access(Username, VHostPath) of - {ok, _R} -> - ok; - not_found -> - rabbit_misc:protocol_error( - access_refused, "access to vhost '~s' refused for user '~s'", - [VHostPath, Username]) - end. - -permission_index(configure) -> #permission.configure; -permission_index(write) -> #permission.write; -permission_index(read) -> #permission.read. - -check_resource_access(Username, - R = #resource{kind = exchange, name = <<"">>}, + check_access( + fun() -> + rabbit_vhost:exists(VHostPath) andalso + Module:check_vhost_access(User, VHostPath, write) + end, + "~s failed checking vhost access to ~s for ~s: ~p~n", + [Module, VHostPath, Username], + "access to vhost '~s' refused for user '~s'", + [VHostPath, Username]). + +check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, Permission) -> - check_resource_access(Username, - R#resource{name = <<"amq.default">>}, + check_resource_access(User, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(Username, - R = #resource{virtual_host = VHostPath, name = Name}, - Permission) -> - Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) of - [] -> - false; - [#user_permission{permission = P}] -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end - end, - if Res -> ok; - true -> rabbit_misc:protocol_error( - access_refused, "access to ~s refused for user '~s'", - [rabbit_misc:rs(R), Username]) - end. - -add_user(Username, Password) -> - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password_hash = - hash_password(Password), - is_admin = false}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end), - rabbit_log:info("Created user ~p~n", [Username]), - R. - -delete_user(Username) -> - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)), - rabbit_log:info("Deleted user ~p~n", [Username]), - R. - -change_password(Username, Password) -> - change_password_hash(Username, hash_password(Password)). - -clear_password(Username) -> - change_password_hash(Username, <<"">>). - -change_password_hash(Username, PasswordHash) -> - R = update_user(Username, fun(User) -> - User#user{ password_hash = PasswordHash } - end), - rabbit_log:info("Changed password for user ~p~n", [Username]), - R. - -hash_password(Cleartext) -> - Salt = make_salt(), - Hash = salted_md5(Salt, Cleartext), - <<Salt/binary, Hash/binary>>. - -check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> - Hash =:= salted_md5(Salt, Cleartext). - -make_salt() -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), - Salt = random:uniform(16#ffffffff), - <<Salt:32>>. - -salted_md5(Salt, Cleartext) -> - Salted = <<Salt/binary, Cleartext/binary>>, - erlang:md5(Salted). - -set_admin(Username) -> - set_admin(Username, true). - -clear_admin(Username) -> - set_admin(Username, false). - -set_admin(Username, IsAdmin) -> - R = update_user(Username, fun(User) -> - User#user{is_admin = IsAdmin} - end), - rabbit_log:info("Set user admin flag for user ~p to ~p~n", - [Username, IsAdmin]), - R. - -update_user(Username, Fun) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - {ok, User} = lookup_user(Username), - ok = mnesia:write(rabbit_user, Fun(User), write) - end)). - -list_users() -> - [{Username, IsAdmin} || - #user{username = Username, is_admin = IsAdmin} <- - mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})]. - -lookup_user(Username) -> - rabbit_misc:dirty_read({rabbit_user, Username}). - -add_vhost(VHostPath) -> - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_vhost, VHostPath}) of - [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), - [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], - ok; - [_] -> - mnesia:abort({vhost_already_exists, VHostPath}) - end - end), - rabbit_log:info("Added vhost ~p~n", [VHostPath]), - R. - -delete_vhost(VHostPath) -> - %%FIXME: We are forced to delete the queues outside the TX below - %%because queue deletion involves sending messages to the queue - %%process, which in turn results in further mnesia actions and - %%eventually the termination of that process. - lists:foreach(fun (Q) -> - {ok,_} = rabbit_amqqueue:delete(Q, false, false) - end, - rabbit_amqqueue:list(VHostPath)), - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> - ok = internal_delete_vhost(VHostPath) - end)), - rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), - R. - -internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _}) -> - ok = clear_permissions(Username, VHostPath) - end, - list_vhost_permissions(VHostPath)), - ok = mnesia:delete({rabbit_vhost, VHostPath}), - ok. - -vhost_exists(VHostPath) -> - mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. - -list_vhosts() -> - mnesia:dirty_all_keys(rabbit_vhost). - -validate_regexp(RegexpBin) -> - Regexp = binary_to_list(RegexpBin), - case re:compile(Regexp) of - {ok, _} -> ok; - {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) +check_resource_access(User = #user{username = Username, auth_backend = Module}, + Resource, Permission) -> + check_access( + fun() -> Module:check_resource_access(User, Resource, Permission) end, + "~s failed checking resource access to ~p for ~s: ~p~n", + [Module, Resource, Username], + "access to ~s refused for user '~s'", + [rabbit_misc:rs(Resource), Username]). + +check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> + Allow = case Fun() of + {error, _} = E -> + rabbit_log:error(ErrStr, ErrArgs ++ [E]), + false; + Else -> + Else + end, + case Allow of + true -> + ok; + false -> + rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) end. -set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> ok = mnesia:write( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = #permission{ - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, - write) - end)). - - -clear_permissions(Username, VHostPath) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> - ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) - end)). - -list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(match_user_vhost('_', '_'))]. - -list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_vhost( - VHostPath, match_user_vhost('_', VHostPath)))]. - -list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_user( - Username, match_user_vhost(Username, '_')))]. - -list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm} || - {_, _, ConfigurePerm, WritePerm, ReadPerm} <- - list_permissions(rabbit_misc:with_user_and_vhost( - Username, VHostPath, - match_user_vhost(Username, VHostPath)))]. - -list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || - #user_permission{user_vhost = #user_vhost{username = Username, - virtual_host = VHostPath}, - permission = #permission{ configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction(QueryThunk)]. - -match_user_vhost(Username, VHostPath) -> - fun () -> mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = '_'}, - read) - end. +%% Permission = write -> log in +%% Permission = read -> learn of the existence of (only relevant for +%% management plugin) +list_vhosts(User = #user{username = Username, auth_backend = Module}, + Permission) -> + lists:filter( + fun(VHost) -> + case Module:check_vhost_access(User, VHost, Permission) of + {error, _} = E -> + rabbit_log:warning("~w failed checking vhost access " + "to ~s for ~s: ~p~n", + [Module, VHost, Username, E]), + false; + Else -> + Else + end + end, rabbit_vhost:list()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 981dd31d..38b83117 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - {State2, ChAckTags1} = + ChAckTags1 = case AckRequired of - true -> {State1, - sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message(Message, State1), - ChAckTags} + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2#q{ + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State3); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -427,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). - -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {_ , undefined}} -> ok; - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok +confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {[], GTC}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + [rabbit_channel:confirm(ChPid, MsgSeqNos) || + {ChPid, MsgSeqNos} <- group_confirms_by_channel( + CMs1, [{Ch, [MsgSeqNo]}])]; + [] -> + ok end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. + State#q{guid_to_channel = GTC1}. -confirm_message(#basic_message{guid = Guid}, State) -> - confirm_message_by_guid(Guid, State). +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - State; + {no_confirm, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { @@ -451,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; + {confirm, + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; record_confirm_message(_Delivery, State) -> - State. - -ack_by_acktags(AckTags, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), - confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + {no_confirm, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -473,12 +481,12 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ, q = Q}) -> - NeedsConfirming = Message#basic_message.is_persistent andalso - Q#amqqueue.durable, - case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok + {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming + case {NeedsConfirming, MsgSeqNo} of + {_, undefined} -> ok; + {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + {confirm, _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -490,31 +498,37 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = NeedsConfirming}, + needs_confirming = (NeedsConfirming =:= confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), + {Delivered, NeedsConfirming, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + {NeedsConfirming, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> record_current_channel_tx(ChPid, Txn), {true, + NeedsConfirming, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, State1} -> + {true, _, State1} -> {true, State1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + {false, NeedsConfirming, State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}} -> + #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ - needs_confirming = (MsgSeqNo =/= undefined)}, + needs_confirming = + (NeedsConfirming =:= confirm)}, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. @@ -771,18 +785,19 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -823,7 +838,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -838,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, case Delivered of - true -> State1; - false -> confirm_message(Message, State1) - end); + reply(Delivered, State1); -handle_call({deliver, Delivery}, _From, State) -> - %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Delivery, State), - reply(Delivered, NewState); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" delivery mode. Reply asap. + gen_server2:reply(From, true), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), + noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -881,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, sets:add_element(AckTag, ChAckTags)}), State2; - false -> confirm_message(Message, State2) + false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State3) @@ -1019,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - NewState = ack_by_acktags(AckTags, State), - {NewC, NewState}; + BQS1 = BQ:ack(AckTags, BQS), + {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, State#q{backing_queue_state = BQS1}} @@ -1029,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -1038,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> ack_by_acktags(AckTags, State) + false -> BQS1 = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl new file mode 100644 index 00000000..0dc8e61b --- /dev/null +++ b/src/rabbit_auth_backend.erl @@ -0,0 +1,76 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_backend). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% A description proplist as with auth mechanisms, + %% exchanges. Currently unused. + {description, 0}, + + %% Check a user can log in, given a username and a proplist of + %% authentication information (e.g. [{password, Password}]). + %% + %% Possible responses: + %% {ok, User} + %% Authentication succeeded, and here's the user record. + %% {error, Error} + %% Something went wrong. Log and die. + %% {refused, Msg, Args} + %% Client failed authentication. Log and die. + {check_user_login, 2}, + + %% Given #user, vhost path and permission, can a user access a vhost? + %% Permission is read - learn of the existence of (only relevant for + %% management plugin) + %% or write - log in + %% + %% Possible responses: + %% true + %% false + %% {error, Error} + %% Something went wrong. Log and die. + {check_vhost_access, 3}, + + %% Given #user, resource and permission, can a user access a resource? + %% + %% Possible responses: + %% true + %% false + %% {error, Error} + %% Something went wrong. Log and die. + {check_resource_access, 3} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl new file mode 100644 index 00000000..233e2b90 --- /dev/null +++ b/src/rabbit_auth_backend_internal.erl @@ -0,0 +1,347 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_backend_internal). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_backend). + +-export([description/0]). +-export([check_user_login/2, check_vhost_access/3, check_resource_access/3]). + +-export([add_user/2, delete_user/1, change_password/2, set_admin/1, + clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). +-export([make_salt/0, check_password/2, change_password_hash/2, + hash_password/1]). +-export([set_permissions/5, clear_permissions/2, + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). + +-include("rabbit_auth_backend_spec.hrl"). + +-ifdef(use_specs). + +-type(regexp() :: binary()). + +-spec(add_user/2 :: (rabbit_types:username(), rabbit_types:password()) -> 'ok'). +-spec(delete_user/1 :: (rabbit_types:username()) -> 'ok'). +-spec(change_password/2 :: (rabbit_types:username(), rabbit_types:password()) + -> 'ok'). +-spec(clear_password/1 :: (rabbit_types:username()) -> 'ok'). +-spec(make_salt/0 :: () -> binary()). +-spec(check_password/2 :: (rabbit_types:password(), + rabbit_types:password_hash()) -> boolean()). +-spec(change_password_hash/2 :: (rabbit_types:username(), + rabbit_types:password_hash()) -> 'ok'). +-spec(hash_password/1 :: (rabbit_types:password()) + -> rabbit_types:password_hash()). +-spec(set_admin/1 :: (rabbit_types:username()) -> 'ok'). +-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok'). +-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]). +-spec(lookup_user/1 :: (rabbit_types:username()) + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). +-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) + -> 'ok'). +-spec(list_permissions/0 :: + () -> [{rabbit_types:username(), rabbit_types:vhost(), + regexp(), regexp(), regexp()}]). +-spec(list_vhost_permissions/1 :: + (rabbit_types:vhost()) -> [{rabbit_types:username(), + regexp(), regexp(), regexp()}]). +-spec(list_user_permissions/1 :: + (rabbit_types:username()) -> [{rabbit_types:vhost(), + regexp(), regexp(), regexp()}]). +-spec(list_user_vhost_permissions/2 :: + (rabbit_types:username(), rabbit_types:vhost()) + -> [{regexp(), regexp(), regexp()}]). + +-endif. + +%%---------------------------------------------------------------------------- + +%% Implementation of rabbit_auth_backend + +description() -> + [{name, <<"Internal">>}, + {description, <<"Internal user / password database">>}]. + +check_user_login(Username, []) -> + internal_check_user_login(Username, fun(_) -> true end); +check_user_login(Username, [{password, Password}]) -> + internal_check_user_login( + Username, + fun(#internal_user{password_hash = Hash}) -> + check_password(Password, Hash) + end); +check_user_login(Username, AuthProps) -> + exit({unknown_auth_props, Username, AuthProps}). + +internal_check_user_login(Username, Fun) -> + Refused = {refused, "user '~s' - invalid credentials", [Username]}, + case lookup_user(Username) of + {ok, User = #internal_user{is_admin = IsAdmin}} -> + case Fun(User) of + true -> {ok, #user{username = Username, + is_admin = IsAdmin, + auth_backend = ?MODULE, + impl = User}}; + _ -> Refused + end; + {error, not_found} -> + Refused + end. + +check_vhost_access(#user{is_admin = true}, _VHostPath, read) -> + true; + +check_vhost_access(#user{username = Username}, VHostPath, _) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> false; + [_R] -> true + end + end). + +check_resource_access(#user{username = Username}, + #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end + end. + +permission_index(configure) -> #permission.configure; +permission_index(write) -> #permission.write; +permission_index(read) -> #permission.read. + +%%---------------------------------------------------------------------------- +%% Manipulation of the user database + +add_user(Username, Password) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + is_admin = false}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end), + rabbit_log:info("Created user ~p~n", [Username]), + R. + +delete_user(Username) -> + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)), + rabbit_log:info("Deleted user ~p~n", [Username]), + R. + +change_password(Username, Password) -> + change_password_hash(Username, hash_password(Password)). + +clear_password(Username) -> + change_password_hash(Username, <<"">>). + +change_password_hash(Username, PasswordHash) -> + R = update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash } + end), + rabbit_log:info("Changed password for user ~p~n", [Username]), + R. + +hash_password(Cleartext) -> + Salt = make_salt(), + Hash = salted_md5(Salt, Cleartext), + <<Salt/binary, Hash/binary>>. + +check_password(Cleartext, <<Salt:4/binary, Hash/binary>>) -> + Hash =:= salted_md5(Salt, Cleartext). + +make_salt() -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + Salt = random:uniform(16#ffffffff), + <<Salt:32>>. + +salted_md5(Salt, Cleartext) -> + Salted = <<Salt/binary, Cleartext/binary>>, + erlang:md5(Salted). + +set_admin(Username) -> + set_admin(Username, true). + +clear_admin(Username) -> + set_admin(Username, false). + +set_admin(Username, IsAdmin) -> + R = update_user(Username, fun(User) -> + User#internal_user{is_admin = IsAdmin} + end), + rabbit_log:info("Set user admin flag for user ~p to ~p~n", + [Username, IsAdmin]), + R. + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +list_users() -> + [{Username, IsAdmin} || + #internal_user{username = Username, is_admin = IsAdmin} <- + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). + +validate_regexp(RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) + end. + +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) + end)). + + +clear_permissions(Username, VHostPath) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) + end)). + +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(match_user_vhost('_', '_'))]. + +list_vhost_permissions(VHostPath) -> + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_vhost:with( + VHostPath, match_user_vhost('_', VHostPath)))]. + +list_user_permissions(Username) -> + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user( + Username, match_user_vhost(Username, '_')))]. + +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + +list_permissions(QueryThunk) -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + #user_permission{user_vhost = #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl index 6572f786..1c4e5c15 100644 --- a/src/rabbit_auth_mechanism_external.erl +++ b/src/rabbit_auth_mechanism_external.erl @@ -81,7 +81,7 @@ handle_response(_Response, #state{username = Username}) -> {refused, _, _} = E -> E; _ -> - case rabbit_access_control:lookup_user(Username) of + case rabbit_access_control:check_user_login(Username, []) of {ok, User} -> {ok, User}; {error, not_found} -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index edafd52d..930e48e6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/7, do/2, do/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -47,10 +47,9 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, most_recently_declared_queue, + user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_CONFIRMS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -83,7 +80,7 @@ -type(channel_number() :: non_neg_integer()). -spec(start_link/7 :: - (channel_number(), pid(), pid(), rabbit_access_control:username(), + (channel_number(), pid(), pid(), rabbit_types:user(), rabbit_types:vhost(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). @@ -97,8 +94,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -111,9 +107,9 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, +start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> @@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> pg_local:get_members(rabbit_channels). @@ -168,7 +161,7 @@ emit_stats(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, +init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -184,7 +177,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, next_tag = 1, uncommitted_ack_q = queue:new(), unacked_message_q = queue:new(), - username = Username, + user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), @@ -192,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, - publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), + publish_seqno = 1, unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_confirms, State) -> - {noreply, internal_flush_confirms(State)}; - -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, confirm(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> confirm(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -371,7 +358,7 @@ return_queue_declare_ok(#resource{name = ActualName}, message_count = MessageCount, consumer_count = ConsumerCount}). -check_resource_access(Username, Resource, Perm) -> +check_resource_access(User, Resource, Perm) -> V = {Resource, Perm}, Cache = case get(permission_cache) of undefined -> []; @@ -381,7 +368,7 @@ check_resource_access(Username, Resource, Perm) -> case lists:member(V, Cache) of true -> lists:delete(V, Cache); false -> ok = rabbit_access_control:check_resource_access( - Username, Resource, Perm), + User, Resource, Perm), lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) end, put(permission_cache, [V | CacheTail]), @@ -391,14 +378,25 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configure_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, configure). +check_configure_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, configure). -check_write_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, write). +check_write_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, write). -check_read_permitted(Resource, #ch{username = Username}) -> - check_resource_access(Username, Resource, read). +check_read_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, read). + +check_user_id_header(#'P_basic'{user_id = undefined}, _) -> + ok; +check_user_id_header(#'P_basic'{user_id = Username}, + #ch{user = #user{username = Username}}) -> + ok; +check_user_id_header(#'P_basic'{user_id = Claimed}, + #ch{user = #user{username = Actual}}) -> + rabbit_misc:protocol_error( + precondition_failed, "user_id property set to '~s' but " + "authenticated user was '~s'", [Claimed, Actual]). check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, @@ -473,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -confirm(undefined, _QPid, State) -> +confirm([], _QPid, State) -> State; -confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. +confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], + MS = gb_sets:from_list(MsgSeqNos), + QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), + send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS), + queues_for_msg = QFM1}); +confirm(MsgSeqNos, QPid, State) -> + {DoneMessages, State1} = + lists:foldl( + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + case gb_sets:is_element(MsgSeqNo, UC0) of + false -> {DMs, State0}; + true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end + end, {[], State}, MsgSeqNos), + send_confirms(DoneMessages, State1). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -551,6 +537,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + check_user_id_header(DecodedContent#content.properties, State), IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of @@ -998,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1241,12 +1218,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1260,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> +send_confirms([], State) -> State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SCs = lists:usort(Cs), + CutOff = case gb_sets:is_empty(UC) of + true -> lists:last(SCs) + 1; + false -> gb_sets:smallest(UC) + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, + ok = lists:foldl(fun(T, ok) -> + rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = T}) + end, ok, Ss), + State. -terminate(State) -> - stop_confirm_timer(State), +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1309,7 +1267,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{username = Username}) -> Username; +i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 747d23d3..9f50176d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -48,7 +48,7 @@ -type(start_link_args() :: {rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -56,7 +56,7 @@ %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -69,7 +69,7 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, Username, VHost, + [Channel, ReaderPid, WriterPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index ff3995b5..a6b1f7fa 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -78,4 +78,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. - diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index df55d961..8a3275bc 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -201,48 +201,48 @@ action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> Inform("Creating user ~p", [Username]), - call(Node, {rabbit_access_control, add_user, Args}); + call(Node, {rabbit_auth_backend_internal, add_user, Args}); action(delete_user, Node, Args = [_Username], _Opts, Inform) -> Inform("Deleting user ~p", Args), - call(Node, {rabbit_access_control, delete_user, Args}); + call(Node, {rabbit_auth_backend_internal, delete_user, Args}); action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), - call(Node, {rabbit_access_control, change_password, Args}); + call(Node, {rabbit_auth_backend_internal, change_password, Args}); action(clear_password, Node, Args = [Username], _Opts, Inform) -> Inform("Clearing password for user ~p", [Username]), - call(Node, {rabbit_access_control, clear_password, Args}); + call(Node, {rabbit_auth_backend_internal, clear_password, Args}); action(set_admin, Node, [Username], _Opts, Inform) -> Inform("Setting administrative status for user ~p", [Username]), - call(Node, {rabbit_access_control, set_admin, [Username]}); + call(Node, {rabbit_auth_backend_internal, set_admin, [Username]}); action(clear_admin, Node, [Username], _Opts, Inform) -> Inform("Clearing administrative status for user ~p", [Username]), - call(Node, {rabbit_access_control, clear_admin, [Username]}); + call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]}); action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), - display_list(call(Node, {rabbit_access_control, list_users, []})); + display_list(call(Node, {rabbit_auth_backend_internal, list_users, []})); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), - call(Node, {rabbit_access_control, add_vhost, Args}); + call(Node, {rabbit_vhost, add, Args}); action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), - call(Node, {rabbit_access_control, delete_vhost, Args}); + call(Node, {rabbit_vhost, delete, Args}); action(list_vhosts, Node, [], _Opts, Inform) -> Inform("Listing vhosts", []), - display_list(call(Node, {rabbit_access_control, list_vhosts, []})); + display_list(call(Node, {rabbit_vhost, list, []})); action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_permissions, - Args})); + display_list(call(Node, {rabbit_auth_backend_internal, + list_user_permissions, Args})); action(list_queues, Node, Args, Opts, Inform) -> Inform("Listing queues", []), @@ -296,19 +296,20 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, set_permissions, + call(Node, {rabbit_auth_backend_internal, set_permissions, [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); + call(Node, {rabbit_auth_backend_internal, clear_permissions, + [Username, VHost]}); action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), - display_list(call(Node, {rabbit_access_control, list_vhost_permissions, - [VHost]})). + display_list(call(Node, {rabbit_auth_backend_internal, + list_vhost_permissions, [VHost]})). default_if_empty(List, Default) when is_list(List) -> if List == [] -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 06ba319b..15ba787a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,7 +46,7 @@ -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). --export([with_user/2, with_vhost/2, with_user_and_vhost/3]). +-export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). @@ -72,7 +72,7 @@ -ifdef(use_specs). --export_type([resource_name/0]). +-export_type([resource_name/0, thunk/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -137,10 +137,9 @@ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A). --spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A). +-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: - (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A)) + (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). @@ -366,19 +365,8 @@ with_user(Username, Thunk) -> end end. -with_vhost(VHostPath, Thunk) -> - fun () -> - case mnesia:read({rabbit_vhost, VHostPath}) of - [] -> - mnesia:abort({no_such_vhost, VHostPath}); - [_V] -> - Thunk() - end - end. - with_user_and_vhost(Username, VHostPath, Thunk) -> - with_user(Username, with_vhost(VHostPath, Thunk)). - + with_user(Username, rabbit_vhost:with(VHostPath, Thunk)). execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 11f5e410..38cc82a6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -163,10 +163,10 @@ nodes_of_type(Type) -> table_definitions() -> [{rabbit_user, - [{record_name, user}, - {attributes, record_info(fields, user)}, + [{record_name, internal_user}, + {attributes, record_info(fields, internal_user)}, {disc_copies, [node()]}, - {match, #user{_='_'}}]}, + {match, #internal_user{_='_'}}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2e1834c7..deb62eb2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,6 +81,7 @@ file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table + dying_clients, %% set of dying clients client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? @@ -306,6 +307,17 @@ %% sure that reads are not attempted from files which are in the %% process of being garbage collected. %% +%% When a message is removed, its reference count is decremented. Even +%% if the reference count becomes 0, its entry is not removed. This is +%% because in the event of the same message being sent to several +%% different queues, there is the possibility of one queue writing and +%% removing the message before other queues write it at all. Thus +%% accomodating 0-reference counts allows us to avoid unnecessary +%% writes here. Of course, there are complications: the file to which +%% the message has already been written could be locked pending +%% deletion or GC, which means we have to rewrite the message as the +%% original copy will now be lost. +%% %% The server automatically defers reads, removes and contains calls %% that occur which refer to files which are currently being %% GC'd. Contains calls are only deferred in order to ensure they do @@ -323,6 +335,55 @@ %% heavily overloaded, clients can still write and read messages with %% very low latency and not block at all. %% +%% Clients of the msg_store are required to register before using the +%% msg_store. This provides them with the necessary client-side state +%% to allow them to directly access the various caches and files. When +%% they terminate, they should deregister. They can do this by calling +%% either client_terminate/1 or client_delete_and_terminate/1. The +%% differences are: (a) client_terminate is synchronous. As a result, +%% if the msg_store is badly overloaded and has lots of in-flight +%% writes and removes to process, this will take some time to +%% return. However, once it does return, you can be sure that all the +%% actions you've issued to the msg_store have been processed. (b) Not +%% only is client_delete_and_terminate/1 asynchronous, but it also +%% permits writes and subsequent removes from the current +%% (terminating) client which are still in flight to be safely +%% ignored. Thus from the point of view of the msg_store itself, and +%% all from the same client: +%% +%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N +%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 --> +%% +%% The client obviously sent T after all the other messages (up to +%% W4), but because the msg_store prioritises messages, the T can be +%% promoted and thus received early. +%% +%% Thus at the point of the msg_store receiving T, we have messages 1 +%% and 2 with a refcount of 1. After T, W3 will be ignored because +%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be +%% ignored because the messages that they refer to were already known +%% to the msg_store prior to T. However, it can be a little more +%% complex: after the first R2, the refcount of msg 2 is 0. At that +%% point, if a GC occurs or file deletion, msg 2 could vanish, which +%% would then mean that the subsequent W2 and R2 are then ignored. +%% +%% The use case then for client_delete_and_terminate/1 is if the +%% client wishes to remove everything it's written to the msg_store: +%% it issues removes for all messages it's written and not removed, +%% and then calls client_delete_and_terminate/1. At that point, any +%% in-flight writes (and subsequent removes) can be ignored, but +%% removes and writes for messages the msg_store already knows about +%% will continue to be processed normally (which will normally just +%% involve modifying the reference count, which is fast). Thus we save +%% disk bandwidth for writes which are going to be immediately removed +%% again by the the terminating client. +%% +%% We use a separate set to keep track of the dying clients in order +%% to keep that set, which is inspected on every write and remove, as +%% small as possible. Inspecting client_refs - the set of all clients +%% - would degrade performance with many healthy clients and few, if +%% any, dying clients, which is the typical case. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) -> client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), + ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). client_ref(#client_msstate { client_ref = Ref }) -> Ref. @@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients = sets:new(), client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) -> {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; + {client_dying, _Pid} -> 7; _ -> 0 end. @@ -681,15 +745,22 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1). +handle_cast({client_dying, CRef}, + State = #msstate { dying_clients = DyingClients }) -> + DyingClients1 = sets:add_element(CRef, DyingClients), + write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { - client_refs = sets:del_element(CRef, ClientRefs) }); + State = #msstate { client_refs = ClientRefs, + dying_clients = DyingClients }) -> + State1 = clear_client_callback( + CRef, State #msstate { + client_refs = sets:del_element(CRef, ClientRefs), + dying_clients = sets:del_element(CRef, DyingClients) }), + noreply(remove_message(CRef, CRef, State1)); handle_cast({write, CRef, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, + State = #msstate { file_summary_ets = FileSummaryEts, current_file = CurFile, cur_file_cache_ets = CurFileCacheEts, client_ondisk_callback = CODC, @@ -705,41 +776,47 @@ handle_cast({write, CRef, Guid}, error -> CTG end, State1 = State #msstate { cref_to_guids = CTG1 }, - case index_lookup(Guid, State1) of - not_found -> + case should_mask_action(CRef, Guid, State1) of + {true, _Location} -> + noreply(State1); + {false, not_found} -> write_message(Guid, Msg, State1); - #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State1), write_message(Guid, Msg, State1); - [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State1), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, TotalSize}]), - noreply(State1 #msstate { - sum_valid_data = SumValid + TotalSize }) + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently, + %% we'll have to write a new copy, which will then + %% be younger, so ignore this write. + noreply(State1); + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + noreply(adjust_valid_total_size(File, TotalSize, State)) end; - #msg_location { ref_count = RefCount, file = File } -> + {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC ok = index_update_ref_count(Guid, RefCount + 1, State1), CTG2 = case {dict:find(CRef, CODC), File} of {{ok, _}, CurFile} -> CTG1; - {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), + {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid), + written), CTG; _ -> CTG1 end, - noreply(State #msstate { cref_to_guids = CTG2 }) + noreply(State1 #msstate { cref_to_guids = CTG2 }) end; handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, State2) end, + fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, State, Guids), - State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), - noreply(maybe_compact(State2)); + noreply(maybe_compact( + client_confirm(CRef, gb_sets:from_list(Guids), removed, State1))); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -861,9 +938,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, - cref_to_guids = CTG }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, Guids, NS) -> case gb_sets:is_empty(Guids) of @@ -871,14 +948,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, Guids} | NS] end end, [], CTG), - if Syncs =:= [] andalso CGs =:= [] -> ok; - true -> file_handle_cache:sync(CurHdl) + case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + [K() || K <- lists:reverse(Syncs)], + [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. - write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, @@ -990,34 +1067,43 @@ contains_message(Guid, From, end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = - index_lookup_positive_ref_count(Guid, State), - %% only update field, otherwise bad interaction with concurrent GC - Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, File, State); - [#file_summary {}] -> +remove_message(Guid, CRef, + State = #msstate { file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg + %% whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = + fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME here + %% because there may be further writes in the mailbox + %% for the same msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, Guid, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size(File, -TotalSize, + State)) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), ok = Dec(), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, -TotalSize}]), - delete_file_if_empty( - File, State #msstate { - sum_valid_data = SumValid - TotalSize }) - end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), - ok = Dec(), - State + State + end end. add_to_pending_gc_completion( @@ -1039,8 +1125,8 @@ run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending_action({remove, Guid}, State) -> - remove_message(Guid, State). +run_pending_action({remove, Guid, CRef}, State) -> + remove_message(Guid, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1051,15 +1137,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +adjust_valid_total_size(File, Delta, State = #msstate { + sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts }) -> + [_] = ets:update_counter(FileSummaryEts, File, + [{#file_summary.valid_total_size, Delta}]), + State #msstate { sum_valid_data = SumValid + Delta }. + orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, +client_confirm(CRef, Guids, ActionTaken, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids), + {ok, Fun} -> Fun(Guids, ActionTaken), CTG1 = case dict:find(CRef, CTG) of {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), @@ -1073,6 +1166,29 @@ client_confirm(CRef, Guids, error -> State end. +%% Detect whether the Guid is older or younger than the client's death +%% msg (if there is one). If the msg is older than the client death +%% msg, and it has a 0 ref_count we must only alter the ref_count, not +%% rewrite the msg - rewriting it would make it younger than the death +%% msg and thus should be ignored. Note that this (correctly) returns +%% false when testing to remove the death msg itself. +should_mask_action(CRef, Guid, + State = #msstate { dying_clients = DyingClients }) -> + case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of + {false, Location} -> + {false, Location}; + {true, not_found} -> + {true, not_found}; + {true, #msg_location { file = File, offset = Offset, + ref_count = RefCount } = Location} -> + #msg_location { file = DeathFile, offset = DeathOffset } = + index_lookup(CRef, State), + {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of + {true, _} -> true; + {false, 0} -> false_if_increment; + {false, _} -> false + end, Location} + end. %%---------------------------------------------------------------------------- %% file helper functions diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 89954b06..c6a083bb 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -32,7 +32,7 @@ -module(rabbit_net). -include("rabbit.hrl"). --export([is_ssl/1, controlling_process/2, getstat/2, +-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, async_recv/3, port_command/2, send/2, close/1, sockname/1, peername/1, peercert/1]). @@ -50,6 +50,9 @@ -type(socket() :: port() | #ssl_socket{}). -spec(is_ssl/1 :: (socket()) -> boolean()). +-spec(ssl_info/1 :: (socket()) + -> 'nossl' | ok_val_or_error( + {atom(), {atom(), atom(), atom()}})). -spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(getstat/2 :: (socket(), [stat_option()]) @@ -77,6 +80,11 @@ is_ssl(Sock) -> ?IS_SSL(Sock). +ssl_info(Sock) when ?IS_SSL(Sock) -> + ssl:connection_info(Sock#ssl_socket.ssl); +ssl_info(_Sock) -> + nossl. + controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); controlling_process(Sock, Pid) when is_port(Sock) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..2162104f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,11 +297,12 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -sync([], State) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = Guids }) -> + sync_if([] =/= Guids, State). + +sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled above anyway). - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). + %% emptied (handled by sync_if anyway). + sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +sync_if(false, State) -> + State; +sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> + State; +sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 74e5fc77..9d44cabd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -65,6 +65,8 @@ -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, + ssl_protocol, ssl_key_exchange, + ssl_cipher, ssl_hash, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -742,17 +744,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - SendFun = - fun() -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = - fun() -> - Parent ! timeout - end, + ReceiveFun = fun() -> Parent ! timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -879,6 +874,14 @@ i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, #v1{sock = Sock}) -> + ssl_info(fun ({P, _}) -> P end, Sock); +i(ssl_key_exchange, #v1{sock = Sock}) -> + ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); +i(ssl_cipher, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); +i(ssl_hash, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -929,6 +932,13 @@ socket_info(Get, Select) -> {error, _} -> '' end. +ssl_info(F, Sock) -> + case rabbit_net:ssl_info(Sock) of + nossl -> ''; + {error, _} -> ''; + {ok, Info} -> F(Info) + end. + cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; @@ -943,12 +953,12 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, frame_max = FrameMax, - user = #user{username = Username}, + user = User, vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), Username, VHost, Collector}), + self(), User, VHost, Collector}), erlang:monitor(process, ChPid), put({channel, Channel}, {ChPid, AState}), put({ch_pid, ChPid}, Channel), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index eca748a9..d913092c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1030,7 +1030,7 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self(), + user(<<"user">>), <<"/">>, self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- @@ -1090,7 +1090,7 @@ test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - <<"guest">>, <<"/">>, self(), + user(<<"guest">>), <<"/">>, self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok @@ -1098,6 +1098,13 @@ test_spawn(Receiver) -> end, {Writer, Ch}. +user(Username) -> + #user{username = Username, + is_admin = true, + auth_backend = rabbit_auth_backend_internal, + impl = #internal_user{username = Username, + is_admin = true}}. + test_statistics_receiver(Pid) -> receive shutdown -> @@ -1689,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), - {A, B} = + {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), @@ -1698,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:write(Guid, Guid, MSCState), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastGuidWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. @@ -1881,7 +1890,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1983,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1993,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2027,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2057,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2074,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2105,7 +2114,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2167,3 +2176,4 @@ test_configurable_server_properties() -> passed. nop(_) -> ok. +nop(_, _) -> ok. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 548014be..70d18d7a 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -42,8 +42,9 @@ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, - connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, - ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection/0, protocol/0, user/0, internal_user/0, + username/0, password/0, password_hash/0, ok/1, error/1, + ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). -type(channel_exit() :: no_return()). @@ -151,9 +152,19 @@ -type(protocol() :: rabbit_framing:protocol()). -type(user() :: - #user{username :: rabbit_access_control:username(), - password_hash :: rabbit_access_control:password_hash(), - is_admin :: boolean()}). + #user{username :: username(), + is_admin :: boolean(), + auth_backend :: atom(), + impl :: any()}). + +-type(internal_user() :: + #internal_user{username :: username(), + password_hash :: password_hash(), + is_admin :: boolean()}). + +-type(username() :: binary()). +-type(password() :: binary()). +-type(password_hash() :: binary()). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7848c848..b5ff2b12 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -28,6 +28,7 @@ -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). +-rabbit_upgrade({user_to_internal_user, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(hash_passwords/0 :: () -> 'ok'). -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). +-spec(user_to_internal_user/0 :: () -> 'ok'). -endif. @@ -60,7 +62,7 @@ hash_passwords() -> mnesia( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> - Hash = rabbit_access_control:hash_password(Password), + Hash = rabbit_auth_backend_internal:hash_password(Password), {user, Username, Hash, IsAdmin} end, [username, password_hash, is_admin]). @@ -85,8 +87,21 @@ internal_exchanges() -> || T <- Tables ], ok. +user_to_internal_user() -> + mnesia( + rabbit_user, + fun({user, Username, PasswordHash, IsAdmin}) -> + {internal_user, Username, PasswordHash, IsAdmin} + end, + [username, password_hash, is_admin], internal_user). + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. + +mnesia(TableName, Fun, FieldList, NewRecordName) -> + {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, + NewRecordName), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 565c61e7..b6681c6b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -412,7 +412,9 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids, ActionTaken) -> + msgs_written_to_disk(Self, Guids, ActionTaken) + end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, + _MsgProps, State = #vqstate { len = 0 }) -> + blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - unconfirmed = Unconfirmed1 }))}. + unconfirmed = UC1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - {Guids, State1} = - ack(fun msg_store_remove/3, - fun ({_IsPersistent, Guid, _MsgProps}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1); - (#msg_status{msg = #basic_message { guid = Guid }}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1) - end, - AckTags, State), - {Guids, a(State1)}. + a(ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - {_Guids, State1} = + a(reduce_memory_use( ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State), - a(reduce_memory_use(State1)). + AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -812,17 +809,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; -needs_idle_timeout(_State) -> - true. +needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) -> + case {OnSync, gb_sets:is_empty(UC)} of + {?BLANK_SYNC, true} -> + {Res, _State} = reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State), + Res; + _ -> + true + end. -idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). +idle_timeout(State) -> + a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, NewState}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - unconfirmed = Unconfirmed1 }}. + unconfirmed = UC1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1323,7 +1324,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore, _AllGuids} = + {PersistentSeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1342,9 +1343,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore, AllGuids}, + {{PersistentSeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1364,24 +1365,21 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {lists:reverse(AllGuids), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false, - guid = Guid }, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> - {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; + index_on_disk = false }, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), - [Guid | AllGuids]}. + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1393,6 +1391,14 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- +confirm_commit_index(State = #vqstate { unconfirmed = UC, + index_state = IndexState }) -> + case gb_sets:is_empty(UC) of + true -> State; + false -> State #vqstate { + index_state = rabbit_queue_index:sync(IndexState) } + end. + remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1403,7 +1409,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -msgs_written_to_disk(QPid, GuidSet) -> +blind_confirm(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). + +msgs_written_to_disk(QPid, GuidSet, removed) -> + blind_confirm(QPid, GuidSet); +msgs_written_to_disk(QPid, GuidSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl new file mode 100644 index 00000000..f939a3fe --- /dev/null +++ b/src/rabbit_vhost.erl @@ -0,0 +1,122 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_vhost). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-export([add/1, delete/1, exists/1, list/0, with/2]). + +-ifdef(use_specs). + +-spec(add/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(exists/1 :: (rabbit_types:vhost()) -> boolean()). +-spec(list/0 :: () -> [rabbit_types:vhost()]). +-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +add(VHostPath) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_vhost, VHostPath}) of + [] -> + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], + ok; + [_] -> + mnesia:abort({vhost_already_exists, VHostPath}) + end + end), + rabbit_log:info("Added vhost ~p~n", [VHostPath]), + R. + +delete(VHostPath) -> + %%FIXME: We are forced to delete the queues outside the TX below + %%because queue deletion involves sending messages to the queue + %%process, which in turn results in further mnesia actions and + %%eventually the termination of that process. + lists:foreach(fun (Q) -> + {ok,_} = rabbit_amqqueue:delete(Q, false, false) + end, + rabbit_amqqueue:list(VHostPath)), + R = rabbit_misc:execute_mnesia_transaction( + with(VHostPath, fun () -> + ok = internal_delete(VHostPath) + end)), + rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), + R. + +internal_delete(VHostPath) -> + lists:foreach(fun (#exchange{name = Name}) -> + ok = rabbit_exchange:delete(Name, false) + end, + rabbit_exchange:list(VHostPath)), + lists:foreach( + fun ({Username, _, _, _}) -> + ok = rabbit_auth_backend_internal:clear_permissions(Username, + VHostPath) + end, + rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)), + ok = mnesia:delete({rabbit_vhost, VHostPath}), + ok. + +exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + +list() -> + mnesia:dirty_all_keys(rabbit_vhost). + +with(VHostPath, Thunk) -> + fun () -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [_V] -> + Thunk() + end + end. |