diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-09-14 15:21:40 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-09-14 15:21:40 +0100 |
commit | 2a292ea5aa042e25b6651f231c7595babdc60109 (patch) | |
tree | a950b75f13d0471ac4c15a6d5d72fb20ea7ae020 | |
parent | 32863c77239508e7e810e83b1294011fde140f0e (diff) | |
parent | 560269aa5abb0b7196c96990c0e070d729b1f2d1 (diff) | |
download | rabbitmq-server-2a292ea5aa042e25b6651f231c7595babdc60109.tar.gz |
Merged bug21528 into default
-rw-r--r-- | docs/html-to-website-xml.xsl | 2 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 49 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 89 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 129 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 76 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_control.erl | 59 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 68 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 118 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 14 | ||||
-rw-r--r-- | src/rabbit_net.erl | 44 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 5 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_types.erl | 7 | ||||
-rw-r--r-- | src/supervisor2.erl | 7 |
21 files changed, 403 insertions, 286 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 662dbea0..c325bb5a 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -30,7 +30,7 @@ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. </p> <p> - <a href="manpages.html">See a list of all manual pages</a>. + <a href="../manpages.html">See a list of all manual pages</a>. </p> </xsl:when> <xsl:otherwise> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index be1ee70b..5179eb25 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -417,7 +417,8 @@ <screen role="example">rabbitmqctl add_user tonyg changeit</screen> <para role="example"> This command instructs the RabbitMQ broker to create a - user named <command>tonyg</command> with (initial) password + (non-administrative) user named <command>tonyg</command> with + (initial) password <command>changeit</command>. </para> </listitem> @@ -465,13 +466,57 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose administrative + status is to be set.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_admin tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to ensure the user + named <command>tonyg</command> is an administrator. This has no + effect when the user logs in via AMQP, but can be used to permit + the user to manage users, virtual hosts and permissions when the + user logs in via some other means (for example with the + management plugin). + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose administrative + status is to be cleared.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_admin tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to ensure the user + named <command>tonyg</command> is not an administrator. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>list_users</command></cmdsynopsis></term> <listitem> <para>Lists users</para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl list_users</screen> <para role="example"> - This command instructs the RabbitMQ broker to list all users. + This command instructs the RabbitMQ broker to list all + users. Each result row will contain the user name and + the administrator status of the user, in that order. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 48e19ff8..4be09c5a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -26,6 +26,7 @@ {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, + {default_user_is_admin, true}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b9abd788..24aa8d98 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --record(user, {username, password}). +-record(user, {username, password, is_admin}). -record(permission, {scope, configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index a44f49a0..4afc66ac 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -1,7 +1,7 @@ Source: rabbitmq-server Section: net Priority: extra -Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> +Maintainer: RabbitMQ Team <packaging@rabbitmq.com> Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc Standards-Version: 3.8.0 diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index aecfb096..d2830a25 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -34,13 +34,15 @@ %% A File Handle Cache %% %% This extends a subset of the functionality of the Erlang file -%% module. +%% module. In the below, we use "file handle" to specifically refer to +%% file handles, and "file descriptor" to refer to descriptors which +%% are not file handles, e.g. sockets. %% %% Some constraints %% 1) This supports one writer, multiple readers per file. Nothing %% else. %% 2) Do not open the same file from different processes. Bad things -%% may happen. +%% may happen, especially for writes. %% 3) Writes are all appends. You cannot write to the middle of a %% file, although you can truncate and then append if you want. %% 4) Although there is a write buffer, there is no read buffer. Feel @@ -49,10 +51,10 @@ %% %% Some benefits %% 1) You do not have to remember to call sync before close -%% 2) Buffering is much more flexible than with plain file module, and -%% you can control when the buffer gets flushed out. This means that -%% you can rely on reads-after-writes working, without having to call -%% the expensive sync. +%% 2) Buffering is much more flexible than with the plain file module, +%% and you can control when the buffer gets flushed out. This means +%% that you can rely on reads-after-writes working, without having to +%% call the expensive sync. %% 3) Unnecessary calls to position and sync get optimised out. %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it @@ -60,14 +62,19 @@ %% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number -%% of open file handles in a "soft" way - the server will never -%% prevent a client from opening a handle, but may immediately tell it -%% to close the handle. Thus you can set the limit to zero and it will -%% still all work correctly, it is just that effectively no caching -%% will take place. The operation of limiting is as follows: +%% of open file descriptors. This is a hard limit: the server +%% component will ensure that clients do not have more file +%% descriptors open than it's configured to allow. %% -%% On open and close, the client sends messages to the server -%% informing it of opens and closes. This allows the server to keep +%% On open, the client requests permission from the server to open the +%% required number of file handles. The server may ask the client to +%% close other file handles that it has open, or it may queue the +%% request and ask other clients to close file handles they have open +%% in order to satisfy the request. Requests are always satisfied in +%% the order they arrive, even if a latter request (for a small number +%% of file handles) can be satisfied before an earlier request (for a +%% larger number of file handles). On close, the client sends a +%% message to the server. These messages allow the server to keep %% track of the number of open handles. The client also keeps a %% gb_tree which is updated on every use of a file handle, mapping the %% time at which the file handle was last used (timestamp) to the @@ -81,21 +88,38 @@ %% Note that this data can go very out of date, by the client using %% the least recently used handle. %% -%% When the limit is reached, the server calculates the average age of -%% the last reported least recently used file handle of all the -%% clients. It then tells all the clients to close any handles not -%% used for longer than this average, by invoking the callback the -%% client registered. The client should receive this message and pass -%% it into set_maximum_since_use/1. However, it is highly possible -%% this age will be greater than the ages of all the handles the -%% client knows of because the client has used its file handles in the -%% mean time. Thus at this point the client reports to the server the +%% When the limit is exceeded (i.e. the number of open file handles is +%% at the limit and there are pending 'open' requests), the server +%% calculates the average age of the last reported least recently used +%% file handle of all the clients. It then tells all the clients to +%% close any handles not used for longer than this average, by +%% invoking the callback the client registered. The client should +%% receive this message and pass it into +%% set_maximum_since_use/1. However, it is highly possible this age +%% will be greater than the ages of all the handles the client knows +%% of because the client has used its file handles in the mean +%% time. Thus at this point the client reports to the server the %% current timestamp at which its least recently used file handle was %% last used. The server will check two seconds later that either it %% is back under the limit, in which case all is well again, or if %% not, it will calculate a new average age. Its data will be much %% more recent now, and so it is very likely that when this is %% communicated to the clients, the clients will close file handles. +%% (In extreme cases, where it's very likely that all clients have +%% used their open handles since they last sent in an update, which +%% would mean that the average will never cause any file handles to +%% be closed, the server can send out an average age of 0, resulting +%% in all available clients closing all their file handles.) +%% +%% Care is taken to ensure that (a) processes which are blocked +%% waiting for file descriptors to become available are not sent +%% requests to close file handles; and (b) given it is known how many +%% file handles a process has open, when the average age is forced to +%% 0, close messages are only sent to enough processes to release the +%% correct number of file handles and the list of processes is +%% randomly shuffled. This ensures we don't cause processes to +%% needlessly close file handles, and ensures that we don't always +%% make such requests of the same processes. %% %% The advantage of this scheme is that there is only communication %% from the client to the server on open, close, and when in the @@ -103,11 +127,7 @@ %% communication from the client to the server on normal file handle %% operations. This scheme forms a feed-back loop - the server does %% not care which file handles are closed, just that some are, and it -%% checks this repeatedly when over the limit. Given the guarantees of -%% now(), even if there is just one file handle open, a limit of 1, -%% and one client, it is certain that when the client calculates the -%% age of the handle, it will be greater than when the server -%% calculated it, hence it should be closed. +%% checks this repeatedly when over the limit. %% %% Handles which are closed as a result of the server are put into a %% "soft-closed" state in which the handle is closed (data flushed out @@ -117,8 +137,19 @@ %% - reopening them when necessary is handled transparently. %% %% The server also supports obtain and transfer. obtain/0 blocks until -%% a file descriptor is available. transfer/1 is transfers ownership -%% of a file descriptor between processes. It is non-blocking. +%% a file descriptor is available, at which point the requesting +%% process is considered to 'own' one more descriptor. transfer/1 +%% transfers ownership of a file descriptor between processes. It is +%% non-blocking. Obtain is used to obtain permission to accept file +%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 +%% macro. File handles can use the entire limit, but will be evicted +%% by obtain calls up to the point at which no more obtain calls can +%% be satisfied by the obtains limit. Thus there will always be some +%% capacity available for file handles. Processes that use obtain are +%% never asked to return them, and they are not managed in any way by +%% the server. It is simply a mechanism to ensure that processes that +%% need file descriptors such as sockets can do so in such a way that +%% the overall number of open file descriptors is managed. %% %% The callers of register_callback/3, obtain/0, and the argument of %% transfer/1 are monitored, reducing the count of handles in use diff --git a/src/rabbit.erl b/src/rabbit.erl index c2574970..8c36a9f0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -489,11 +489,16 @@ maybe_insert_default_data() -> insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), + {ok, DefaultAdmin} = application:get_env(default_user_is_admin), {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + case DefaultAdmin of + true -> rabbit_access_control:set_admin(DefaultUser); + _ -> ok + end, ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, DefaultConfigurePerm, DefaultWritePerm, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 9cfe1ca8..73fd6f0e 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -35,11 +35,12 @@ -export([check_login/2, user_pass_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, list_users/0, - lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([add_user/2, delete_user/1, change_password/2, set_admin/1, + clear_admin/1, list_users/0, lookup_user/1]). +-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, set_permissions/6, clear_permissions/2, - list_vhost_permissions/1, list_user_permissions/1]). + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). %%---------------------------------------------------------------------------- @@ -52,6 +53,7 @@ -type(password() :: binary()). -type(regexp() :: binary()). -type(scope() :: binary()). +-type(scope_atom() :: 'client' | 'all'). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | @@ -68,26 +70,33 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(set_admin/1 :: (username()) -> 'ok'). +-spec(clear_admin/1 :: (username()) -> 'ok'). -spec(list_users/0 :: () -> [username()]). -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). --spec(add_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). +-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). +-spec(list_permissions/0 :: + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) - -> [{username(), regexp(), regexp(), regexp()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_user_permissions/1 :: - (username()) - -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). +-spec(list_user_vhost_permissions/2 :: + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), + scope_atom()}]). -endif. @@ -142,7 +151,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} @@ -160,7 +169,6 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -175,7 +183,7 @@ check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> false; @@ -184,11 +192,12 @@ check_resource_access(Username, {<<"amq.gen",_/binary>>, #permission{scope = client}} -> true; _ -> - PermRegexp = case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; RE -> RE - end, + end, case re:run(Name, PermRegexp, [{capture, none}]) of match -> true; nomatch -> false @@ -208,7 +217,8 @@ add_user(Username, Password) -> [] -> ok = mnesia:write(rabbit_user, #user{username = Username, - password = Password}, + password = Password, + is_admin = false}, write); _ -> mnesia:abort({user_already_exists, Username}) @@ -238,20 +248,39 @@ delete_user(Username) -> R. change_password(Username, Password) -> - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password = Password}, - write) - end)), + R = update_user(Username, fun(User) -> + User#user{password = Password} + end), rabbit_log:info("Changed password for user ~p~n", [Username]), R. +set_admin(Username) -> + set_admin(Username, true). + +clear_admin(Username) -> + set_admin(Username, false). + +set_admin(Username, IsAdmin) -> + R = update_user(Username, fun(User) -> + User#user{is_admin = IsAdmin} + end), + rabbit_log:info("Set user admin flag for user ~p to ~p~n", + [Username, IsAdmin]), + R. + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + list_users() -> - mnesia:dirty_all_keys(rabbit_user). + [{Username, IsAdmin} || + #user{username = Username, is_admin = IsAdmin} <- + mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})]. lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). @@ -301,7 +330,7 @@ delete_vhost(VHostPath) -> R. internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name=Name}) -> + lists:foreach(fun (#exchange{name = Name}) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), @@ -312,6 +341,9 @@ internal_delete_vhost(VHostPath) -> ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. +vhost_exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + list_vhosts() -> mnesia:dirty_all_keys(rabbit_vhost). @@ -339,13 +371,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer fun () -> ok = mnesia:write( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, + scope = Scope, configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -356,10 +388,15 @@ clear_permissions(Username, VHostPath) -> Username, VHostPath, fun () -> ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) end)). +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(match_user_vhost('_', '_'))]. + list_vhost_permissions(VHostPath) -> [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- @@ -372,15 +409,21 @@ list_user_permissions(Username) -> list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + list_permissions(QueryThunk) -> [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - #user_permission{user_vhost = #user_vhost{username = Username, + #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ - scope = Scope, - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- + permission = #permission{ scope = Scope, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. @@ -388,7 +431,7 @@ match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = '_'}, read) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08495862..9a40580e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -201,7 +201,7 @@ next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS)of + case BQ:needs_idle_timeout(BQS) of true -> {ensure_sync_timer(State2), 0}; false -> {stop_sync_timer(State2), hibernate} end. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6caf7302..19150fa9 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -32,7 +32,7 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, add/1, remove/1, add/2, remove/2, list/1]). +-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). -export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx @@ -47,15 +47,17 @@ -type(key() :: binary()). --type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). -spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). -spec(remove/1 :: (rabbit_types:binding()) -> bind_res() | rabbit_types:error('binding_not_found')). @@ -96,6 +98,11 @@ recover() -> [B | Acc] end, [], rabbit_durable_route). +exists(Binding) -> + binding_action( + Binding, + fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). @@ -122,8 +129,8 @@ add(Binding, InnerFun) -> E end end) of - {new, Exchange = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(Exchange, B), + {new, X = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(X, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -174,8 +181,8 @@ list(VHostPath) -> [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, +list_for_exchange(XName) -> + Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -185,8 +192,8 @@ list_for_queue(QueueName) -> mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(ExchangeName, QueueName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, +list_for_exchange_and_queue(XName, QueueName) -> + Route = #route{binding = #binding{exchange_name = XName, queue_name = QueueName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, @@ -215,14 +222,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(ExchangeName) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, +has_for_exchange(XName) -> + Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(ExchangeName) -> +remove_for_exchange(XName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -230,7 +237,7 @@ remove_for_exchange(ExchangeName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, + #route{binding = #binding{exchange_name = XName, _ = '_'}}, write)]. @@ -242,11 +249,11 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(Binding = #binding{exchange_name = ExchangeName, +binding_action(Binding = #binding{exchange_name = XName, queue_name = QueueName, args = Arguments}, Fun) -> call_with_exchange_and_queue( - ExchangeName, QueueName, + XName, QueueName, fun (X, Q) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), Fun(X, Q, Binding#binding{args = SortedArgs}) @@ -263,10 +270,10 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(Exchange, Queue, Fun) -> +call_with_exchange_and_queue(XName, QueueName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of + fun () -> case {mnesia:read({rabbit_exchange, XName}), + mnesia:read({rabbit_queue, QueueName})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; {[_], [ ]} -> {error, queue_not_found}; @@ -320,16 +327,15 @@ remove_for_queue(QueueName, FwdDeleteFun) -> group_bindings_and_auto_delete([], Acc) -> Acc; group_bindings_and_auto_delete( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc). + [B = #binding{exchange_name = XName} | Bs], Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B], Acc). group_bindings_and_auto_delete( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching ExchangeName - [X] = mnesia:read({rabbit_exchange, ExchangeName}), + XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching XName + [X] = mnesia:read({rabbit_exchange, XName}), NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], group_bindings_and_auto_delete(Removed, NewAcc). @@ -352,20 +358,20 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, + #binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}; -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, + #reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}. diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index d1938805..21c39780 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -53,9 +53,7 @@ start_link() -> supervisor2:start_link(?MODULE, []). start_channel(Pid, Args) -> - {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), - link(ChSupPid), - Result. + supervisor2:start_child(Pid, [Args]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 06826b8e..a3b6f369 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -209,6 +209,14 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); +action(set_admin, Node, [Username], _Opts, Inform) -> + Inform("Setting administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, set_admin, [Username]}); + +action(clear_admin, Node, [Username], _Opts, Inform) -> + Inform("Clearing administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_admin, [Username]}); + action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); @@ -304,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, Results), ok; display_info_list(Other, _) -> Other. @@ -315,25 +325,30 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Key, Items) -> - case proplists:get_value(Key, Items) of - #resource{name = Name} -> - escape(Name); - Value when Key =:= address; Key =:= peer_address andalso - is_tuple(Value) -> - inet_parse:ntoa(Value); - Value when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); - Value when is_binary(Value) -> - escape(Value); - Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] - when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); - Value -> - io_lib:format("~w", [Value]) - end. +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + inet_parse:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + inet_parse:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item(Value) -> + io_lib:format("~w", [Value]). display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 40bee25f..2a19d5b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -92,15 +92,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). recover() -> - Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] - end, [], rabbit_durable_exchange), + Xs = rabbit_misc:table_fold( + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] + end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), - lists:keysort(#exchange.name, Exs), []). + lists:keysort(#exchange.name, Xs), []). recover_with_bindings([B = #binding{exchange_name = Name} | Rest], Xs = [#exchange{name = Name} | _], @@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end @@ -257,20 +257,20 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -call_with_exchange(Exchange, Fun) -> +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -280,21 +280,21 @@ delete(ExchangeName, IfUnused) -> maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> - case conditional_delete(Exchange) of - {error, in_use} -> not_deleted; - {deleted, Exchange, []} -> auto_deleted +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case rabbit_binding:has_for_exchange(ExchangeName) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = rabbit_binding:remove_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a9c7db76..18810833 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -98,8 +98,7 @@ }). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked, readers}). + {file, valid_total_size, left, right, file_size, locked, readers}). %%---------------------------------------------------------------------------- @@ -159,8 +158,7 @@ %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: -%% {File, ValidTotalSize, ContiguousTop, Left, Right, -%% FileSize, Locked, Readers} +%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} %% %% The basic idea is that messages are appended to the current file up %% until that file becomes too big (> file_size_limit). At that point, @@ -176,9 +174,7 @@ %% %% As messages are removed from files, holes appear in these %% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. +%% data left in the file. This is needed for garbage collection. %% %% When we discover that a file is now empty, we delete it. When we %% discover that it can be combined with the useful data in either its @@ -224,9 +220,7 @@ %% above B (i.e. truncate to the limit of the good contiguous region %% at the start of the file), then write C and D on top and then write %% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). +%% good data at the bottom of files are not rewritten. %% %% +-------+ +-------+ +-------+ %% | X | | G | | G | @@ -628,20 +622,14 @@ handle_cast({write, Guid}, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, right = undefined, locked = false, file_size = FileSize }] = ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case CurOffset =:= ContiguousTop of - true -> ValidTotalSize1; - false -> ContiguousTop - end, true = ets:update_element( FileSummaryEts, CurFile, [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, noreply( @@ -902,8 +890,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = - index_lookup(Guid, State), + total_size = TotalSize } = index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -911,7 +898,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of @@ -919,12 +905,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, add_to_pending_gc_completion({remove, Guid}, State); false -> ok = index_delete(Guid, State), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}]), + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; @@ -1271,16 +1256,17 @@ scan_file_for_valid_messages(Dir, FileName) -> %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). +drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). -find_contiguous_block_prefix([], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}; -find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, Guids) -> +drop_contiguous_block_prefix([], ExpectedOffset) -> + {ExpectedOffset, []}; +drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, + total_size = TotalSize } | Tail], + ExpectedOffset) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}. + drop_contiguous_block_prefix(Tail, ExpectedOffset1); +drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> + {ExpectedOffset, MsgsAfterGap}. build_index(true, _StartupFunState, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1356,9 +1342,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, {VMAcc, VTSAcc} end end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), {Right, FileSize1} = case Files of %% if it's the last file, we'll truncate to remove any @@ -1375,7 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, left = Left, right = Right, file_size = FileSize1, @@ -1403,7 +1385,6 @@ maybe_roll_to_new_file( true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, - contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, @@ -1530,7 +1511,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> true = ets:update_element( FileSummaryEts, DstFile, [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; false -> concurrent_readers @@ -1541,7 +1521,6 @@ combine_files(#file_summary { file = Source, left = Destination }, #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), @@ -1557,41 +1536,32 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - case DestinationContiguousTop =:= DestinationValid of - true -> - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); - false -> - {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset =/= DestinationContiguousTop -> - %% it cannot be that Offset =:= - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - end, DestinationWorkList), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl) + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + {DestinationContiguousTop, DestinationWorkListTail} = + drop_contiguous_block_prefix(DestinationWorkList), + case DestinationWorkListTail of + [] -> ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), + ok = copy_messages( + DestinationWorkListTail, DestinationContiguousTop, + DestinationValid, DestinationHdl, TmpHdl, Destination, + State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index c7a5a600..5cfd6a5c 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -310,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) -> is_dead(Pid) -> PidS = integer_to_list(Pid), with_os([{unix, fun () -> - Res = os:cmd("ps --no-headers --pid " ++ PidS), - Res == "" + system("kill -0 " ++ PidS + ++ " >/dev/null 2>&1") /= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -322,6 +322,16 @@ is_dead(Pid) -> end end}]). +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + call_all_nodes(Func) -> case read_pids_file() of [] -> throw(no_nodes_running); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 6baa4b88..2286896b 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -46,7 +46,7 @@ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(error() :: rabbit_types:error(any())). --type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()). +-type(socket() :: port() | #ssl_socket{}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). @@ -72,72 +72,58 @@ %%--------------------------------------------------------------------------- +-define(IS_SSL(Sock), is_record(Sock, ssl_socket)). -async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> +async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), spawn(fun () -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} - end), + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), {ok, Ref}; - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); - async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when is_record(Sock, ssl_socket) -> +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); - close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); - controlling_process(Sock, Pid) when is_port(Sock) -> gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> +getstat(Sock, Stats) when ?IS_SSL(Sock) -> inet:getstat(Sock#ssl_socket.tcp, Stats); - getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). - -peername(Sock) when is_record(Sock, ssl_socket) -> +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); - peername(Sock) when is_port(Sock) -> inet:peername(Sock). - -port_command(Sock, Data) when is_record(Sock, ssl_socket) -> +port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of - ok -> - self() ! {inet_reply, Sock, ok}, - true; - {error, Reason} -> - erlang:error(Reason) + ok -> self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> erlang:error(Reason) end; - port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when is_record(Sock, ssl_socket) -> +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); - send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). -sockname(Sock) when is_record(Sock, ssl_socket) -> +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); - sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a21961b5..252f81a3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -339,7 +339,7 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw(E); {channel_exit, ChannelOrFrPid, Reason} -> mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; @@ -489,7 +489,7 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> case channel_cleanup(ChSupPid) of undefined -> exit({abnormal_dependent_exit, ChSupPid, Reason}); @@ -867,6 +867,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), + erlang:monitor(process, ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), put({ch_fr_pid, ChFrPid}, {channel, Channel}), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bfccb0da..bd57f737 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -85,10 +85,10 @@ deliver(QPids, Delivery) -> %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, + exchange_name = XName, queue_name = QName}} <- mnesia:table(rabbit_route), - ExchangeName == Name, + XName == Name, Match(Binding)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b541f0f7..a72656b7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -972,6 +972,8 @@ test_user_management() -> {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), ok = control_action(change_password, ["foo", "baz"]), + ok = control_action(set_admin, ["foo"]), + ok = control_action(clear_admin, ["foo"]), ok = control_action(list_users, []), %% vhost creation diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 9dfd33bd..0b6a15ec 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -38,7 +38,7 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, + amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). @@ -107,8 +107,6 @@ kind :: Kind, name :: Name}). --type(ssl_socket() :: #ssl_socket{}). - -type(listener() :: #listener{node :: node(), protocol :: atom(), @@ -142,7 +140,8 @@ -type(user() :: #user{username :: rabbit_access_control:username(), - password :: rabbit_access_control:password()}). + password :: rabbit_access_control:password(), + is_admin :: boolean()}). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 4a1c5832..93adfcb1 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -34,7 +34,9 @@ %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. %% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% @@ -545,6 +547,9 @@ do_restart(permanent, Reason, Child, State) -> restart(Child, State); do_restart(intrinsic, normal, Child, State) -> {shutdown, state_del_child(Child, State)}; +do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor}, + State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; |