diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-26 12:36:41 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-26 12:36:41 +0100 |
commit | 4c41d35a0c0c80c7d58790a7cb8bcfa0a22bbee1 (patch) | |
tree | d63779e33ab24285524e0236214b8871832b90fb | |
parent | dbea323eaf57a5a4408e1fb237e6e381d3df79bd (diff) | |
parent | 2041124fd94b1011d3f968e52bb8da81918ee33d (diff) | |
download | rabbitmq-server-4c41d35a0c0c80c7d58790a7cb8bcfa0a22bbee1.tar.gz |
Merging bug 23429 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 12 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 75 | ||||
-rw-r--r-- | src/rabbit_control.erl | 6 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 359 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 96 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 9 |
7 files changed, 283 insertions, 276 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3b7244c7..acb99bc8 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -589,7 +589,7 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -597,16 +597,6 @@ <listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem> </varlistentry> <varlistentry> - <term>scope</term> - <listitem><para>Scope of the permissions: either - <command>client</command> (the default) or - <command>all</command>. This determines whether - permissions are checked for server-generated resource - names (<command>all</command>) or only for - client-specified resource names - (<command>client</command>).</para></listitem> - </varlistentry> - <varlistentry> <term>user</term> <listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem> </varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ae672fc9..c1c9bd65 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,7 +30,7 @@ %% -record(user, {username, password, is_admin}). --record(permission, {scope, configure, write, read}). +-record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 73fd6f0e..85452abf 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, set_permissions/6, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, list_user_vhost_permissions/2]). @@ -52,9 +52,6 @@ -type(username() :: binary()). -type(password() :: binary()). -type(regexp() :: binary()). --type(scope() :: binary()). --type(scope_atom() :: 'client' | 'all'). - -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | rabbit_types:channel_exit()). @@ -82,21 +79,15 @@ -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), - regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), - scope_atom()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), - scope_atom()}]). + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). -endif. @@ -188,20 +179,15 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> - case {Name, P} of - {<<"amq.gen",_/binary>>, #permission{scope = client}} -> - true; - _ -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false end end, if Res -> ok; @@ -334,7 +320,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -355,16 +341,7 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, - WritePerm, ReadPerm). - -set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - Scope = case ScopeBin of - <<"client">> -> client; - <<"all">> -> all; - _ -> throw({error, {invalid_scope, ScopeBin}}) - end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -374,7 +351,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, @@ -393,35 +369,34 @@ clear_permissions(Username, VHostPath) -> end)). list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(match_user_vhost('_', '_'))]. list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user_and_vhost( Username, VHostPath, match_user_vhost(Username, VHostPath)))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ scope = Scope, - configure = ConfigurePerm, + permission = #permission{ configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- %% TODO: use dirty ops instead diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8facaf16..6b212745 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -39,7 +39,6 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). --define(SCOPE_OPT, "-s"). %%---------------------------------------------------------------------------- @@ -67,7 +66,7 @@ start() -> {[Command0 | Args], Opts} = rabbit_misc:get_options( [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + {option, ?VHOST_OPT, "/"}], FullCommand), Opts1 = lists:map(fun({K, V}) -> case K of @@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Scope, Username, VHost, CPerm, WPerm, RPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf..b9f3e1a3 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -37,7 +37,8 @@ client_init/2, client_terminate/2, client_delete_and_terminate/3, write/4, read/3, contains/2, remove/2, release/2, sync/3]). --export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal +-export([sync/1, set_maximum_since_use/2, + has_readers/2, combine_files/3, delete_file/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -74,7 +75,6 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active, %% is the GC currently working? gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table @@ -100,10 +100,27 @@ -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). +-record(gc_state, + { dir, + index_module, + index_state, + file_summary_ets, + msg_store + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([gc_state/0, file_num/0]). + +-opaque(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). + -type(server() :: pid() | atom()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { @@ -138,12 +155,11 @@ -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). -spec(sync/1 :: (server()) -> 'ok'). --spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> - 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {ets:tid(), file:filename(), atom(), any()}) -> - 'concurrent_readers' | non_neg_integer()). +-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). +-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> + non_neg_integer()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()). -endif. @@ -375,9 +391,6 @@ sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). -gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -482,9 +495,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; - MsgLocation -> %% different file! + #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState) + client_read1(Server, MsgLocation, Defer, CState); + not_found -> %% it seems not to exist. Defer, just to be sure. + try Release() %% this can badarg, same as locked case, above + catch error:badarg -> ok + end, + Defer() end end. @@ -547,8 +565,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = [], - gc_active = false, + pending_gc_completion = orddict:new(), gc_pid = undefined, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -570,8 +587,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts, + msg_store = self() + }), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -588,10 +610,11 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; - {gc_done, _Reclaimed, _Source, _Destination} -> 8; - {set_maximum_since_use, _Age} -> 8; - _ -> 0 + sync -> 8; + {combine_files, _Source, _Destination, _Reclaimed} -> 8; + {delete_file, _File, _Reclaimed} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 end. handle_call(successfully_recovered_state, _From, State) -> @@ -686,37 +709,23 @@ handle_cast({sync, Guids, K}, handle_cast(sync, State) -> noreply(internal_sync(State)); -handle_cast({gc_done, Reclaimed, Src, Dst}, +handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Src, Dst}, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> - %% GC done, so now ensure that any clients that have open fhs to - %% those files close them before using them again. This has to be - %% done here (given it's done in the msg_store, and not the gc), - %% and not when starting up the GC, because if done when starting - %% up the GC, the client could find the close, and close and - %% reopen the fh, whilst the GC is waiting for readers to - %% disappear, before it's actually done the GC. - true = mark_handle_to_close(FileHandlesEts, Src), - true = mark_handle_to_close(FileHandlesEts, Dst), - %% we always move data left, so Src has gone and was on the - %% right, so need to make dest = source.right.left, and also - %% dest.right = source.right - [#file_summary { left = Dst, - right = SrcRight, - locked = true, - readers = 0 }] = ets:lookup(FileSummaryEts, Src), - %% this could fail if SrcRight =:= undefined - ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), - true = ets:update_element(FileSummaryEts, Dst, - [{#file_summary.locked, false}, - {#file_summary.right, SrcRight}]), - true = ets:delete(FileSummaryEts, Src), - noreply( - maybe_compact(run_pending( - State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false }))); + ok = cleanup_after_file_deletion(Source, State), + %% see comment in cleanup_after_file_deletion + true = mark_handle_to_close(FileHandlesEts, Destination), + true = ets:update_element(FileSummaryEts, Destination, + {#file_summary.locked, false}), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([Source, Destination], State1))); + +handle_cast({delete_file, File, Reclaimed}, + State = #msstate { sum_file_size = SumFileSize }) -> + ok = cleanup_after_file_deletion(File, State), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([File], State1))); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -867,7 +876,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, Guid, From}, - State); + File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -897,19 +906,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> +contains_message(Guid, From, + State = #msstate { pending_gc_completion = Pending }) -> case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> - case GCActive of - {A, B} when File =:= A orelse File =:= B -> - add_to_pending_gc_completion( - {contains, Guid, From}, State); - _ -> - gen_server2:reply(From, true), - State + case orddict:is_key(File, Pending) of + true -> add_to_pending_gc_completion( + {contains, Guid, From}, File, State); + false -> gen_server2:reply(From, true), + State end end. @@ -928,7 +936,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, State); + add_to_pending_gc_completion({remove, Guid}, File, State); [#file_summary {}] -> ok = Dec(), [_] = ets:update_counter( @@ -944,20 +952,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, end. add_to_pending_gc_completion( - Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op | Pending] }. - -run_pending(State = #msstate { pending_gc_completion = [] }) -> - State; -run_pending(State = #msstate { pending_gc_completion = Pending }) -> - State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + Op, File, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = + rabbit_misc:orddict_cons(File, Op, Pending) }. -run_pending({read, Guid, From}, State) -> +run_pending(Files, State) -> + lists:foldl( + fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> + Pending1 = orddict:erase(File, Pending), + lists:foldl( + fun run_pending_action/2, + State1 #msstate { pending_gc_completion = Pending1 }, + lists:reverse(orddict:fetch(File, Pending))) + end, State, Files). + +run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); -run_pending({contains, Guid, From}, State) -> +run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending({remove, Guid}, State) -> +run_pending_action({remove, Guid}, State) -> remove_message(Guid, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> @@ -969,6 +982,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +orddict_store(Key, Val, Dict) -> + false = orddict:is_key(Key, Dict), + orddict:store(Key, Val, Dict). + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- @@ -1429,12 +1446,12 @@ maybe_roll_to_new_file( maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false, - gc_pid = GCPid, - file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = GCPid, + pending_gc_completion = Pending, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a @@ -1443,27 +1460,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, '$end_of_table' -> State; First -> - case find_files_to_gc(FileSummaryEts, FileSizeLimit, - ets:lookup(FileSummaryEts, First)) of + case find_files_to_combine(FileSummaryEts, FileSizeLimit, + ets:lookup(FileSummaryEts, First)) of not_found -> State; {Src, Dst} -> + Pending1 = orddict_store(Dst, [], + orddict_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), true = ets:update_element(FileSummaryEts, Dst, {#file_summary.locked, true}), - ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst), - State1 #msstate { gc_active = {Src, Dst} } + ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), + State1 #msstate { pending_gc_completion = Pending1 } end end; maybe_compact(State) -> State. -find_files_to_gc(FileSummaryEts, FileSizeLimit, - [#file_summary { file = Dst, - valid_total_size = DstValid, - right = Src }]) -> +find_files_to_combine(FileSummaryEts, FileSizeLimit, + [#file_summary { file = Dst, + valid_total_size = DstValid, + right = Src, + locked = DstLocked }]) -> case Src of undefined -> not_found; @@ -1471,13 +1491,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Src, valid_total_size = SrcValid, left = Dst, - right = SrcRight }] = Next = + right = SrcRight, + locked = SrcLocked }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< FileSizeLimit of + _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso + (DstValid > 0) andalso (SrcValid > 0) andalso + not (DstLocked orelse SrcLocked) of true -> {Src, Dst}; - false -> find_files_to_gc( + false -> find_files_to_combine( FileSummaryEts, FileSizeLimit, Next) end end @@ -1486,85 +1509,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - dir = Dir, - sum_file_size = SumFileSize, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + pending_gc_completion = Pending }) -> [#file_summary { valid_total_size = ValidData, - left = Left, - right = Right, - file_size = FileSize, locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when Right =/= undefined -> - %% the eldest file is empty. - true = ets:update_element( - FileSummaryEts, Right, - {#file_summary.left, undefined}); - {_, _} when Right =/= undefined -> - true = ets:update_element(FileSummaryEts, Right, - {#file_summary.left, Left}), - true = ets:update_element(FileSummaryEts, Left, - {#file_summary.right, Right}) - end, - true = mark_handle_to_close(FileHandlesEts, File), - true = ets:delete(FileSummaryEts, File), - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - [index_delete(Guid, State) || - {Guid, _TotalSize, _Offset} <- Messages], - State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - State1 #msstate { sum_file_size = SumFileSize - FileSize }; + 0 -> %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + true = ets:update_element(FileSummaryEts, File, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:delete(GCPid, File), + Pending1 = orddict_store(File, [], Pending), + close_handle(File, + State #msstate { pending_gc_completion = Pending1 }); _ -> State end. +cleanup_after_file_deletion(File, + #msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + %% Ensure that any clients that have open fhs to the file close + %% them before using them again. This has to be done here (given + %% it's done in the msg_store, and not the gc), and not when + %% starting up the GC, because if done when starting up the GC, + %% the client could find the close, and close and reopen the fh, + %% whilst the GC is waiting for readers to disappear, before it's + %% actually done the GC. + true = mark_handle_to_close(FileHandlesEts, File), + [#file_summary { left = Left, + right = Right, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + %% We'll never delete the current file, so right is never undefined + true = Right =/= undefined, %% ASSERTION + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, Left}), + %% ensure the double linked list is maintained + true = case Left of + undefined -> true; %% File is the eldest file (left-most) + _ -> ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + true = ets:delete(FileSummaryEts, File), + ok. + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SrcObj = #file_summary { - readers = SrcReaders, - left = DstFile, - file_size = SrcFileSize, - locked = true }] = ets:lookup(FileSummaryEts, SrcFile), - [DstObj = #file_summary { - readers = DstReaders, - right = SrcFile, - file_size = DstFileSize, - locked = true }] = ets:lookup(FileSummaryEts, DstFile), - - case SrcReaders =:= 0 andalso DstReaders =:= 0 of - true -> TotalValidData = combine_files(SrcObj, DstObj, State), - %% don't update dest.right, because it could be - %% changing at the same time - true = ets:update_element( - FileSummaryEts, DstFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SrcFileSize + DstFileSize - TotalValidData; - false -> concurrent_readers - end. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), +has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary { locked = true, readers = Count }] = + ets:lookup(FileSummaryEts, File), + Count /= 0. + +combine_files(Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), + [#file_summary { + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), + + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, + TotalValidData = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file %% if they're not equal, then we need to write out everything past @@ -1577,7 +1601,7 @@ combine_files(#file_summary { file = Source, drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of [] -> ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); + DestinationHdl, DestinationContiguousTop, TotalValidData); _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), ok = copy_messages( @@ -1591,7 +1615,7 @@ combine_files(#file_summary { file = Source, %% Destination and copy from Tmp back to the end {ok, 0} = file_handle_cache:position(TmpHdl, 0), ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), + DestinationHdl, DestinationContiguousTop, TotalValidData), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid @@ -1599,14 +1623,36 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), ok = file_handle_cache:delete(SourceHdl), - ExpectedSize. -load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> + %% don't update dest.right, because it could be changing at the + %% same time + true = ets:update_element( + FileSummaryEts, Destination, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + + Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + {[], 0} = load_and_vacuum_message_file(File, State), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + gen_server2:cast(Server, {delete_file, File, FileSize}). + +load_and_vacuum_message_file(File, #gc_state { dir = Dir, + index_module = Index, + index_state = IndexState }) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1627,7 +1673,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, #gc_state { index_module = Index, + index_state = IndexState }) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a7855bbf..cd9fd497 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,20 +33,16 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). --record(gcstate, - {dir, - index_state, - index_module, - parent, - file_summary_ets, - scheduled +-record(state, + { pending_no_readers, + msg_store_state }). -include("rabbit.hrl"). @@ -55,10 +51,12 @@ -ifdef(use_specs). --spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> +-spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). --spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). --spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(), + rabbit_msg_store:file_num()) -> 'ok'). +-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). +-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,13 +64,15 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> - gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], - [{timeout, infinity}]). +start_link(MsgStoreState) -> + gen_server2:start_link(?MODULE, [MsgStoreState], + [{timeout, infinity}]). -gc(Server, Source, Destination) -> - gen_server2:cast(Server, {gc, Source, Destination}). +combine(Server, Source, Destination) -> + gen_server2:cast(Server, {combine, Source, Destination}). + +delete(Server, File) -> + gen_server2:cast(Server, {delete, File}). no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). @@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = undefined }, - hibernate, + {ok, #state { pending_no_readers = dict:new(), + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, - State = #gcstate { scheduled = undefined }) -> - {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), - hibernate}; +handle_cast({combine, Source, Destination}, State) -> + {noreply, attempt_action(combine, [Source, Destination], State), hibernate}; -handle_cast({no_readers, File}, - State = #gcstate { scheduled = {Source, Destination} }) - when File =:= Source orelse File =:= Destination -> - {noreply, attempt_gc(State), hibernate}; +handle_cast({delete, File}, State) -> + {noreply, attempt_action(delete, [File], State), hibernate}; -handle_cast({no_readers, _File}, State) -> - {noreply, State, hibernate}; +handle_cast({no_readers, File}, + State = #state { pending_no_readers = Pending }) -> + {noreply, case dict:find(File, Pending) of + error -> + State; + {ok, {Action, Files}} -> + Pending1 = dict:erase(File, Pending), + attempt_action( + Action, Files, + State #state { pending_no_readers = Pending1 }) + end, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -129,16 +129,18 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = {Source, Destination} }) -> - case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of - concurrent_readers -> State; - Reclaimed -> ok = rabbit_msg_store:gc_done( - Parent, Reclaimed, Source, Destination), - State #gcstate { scheduled = undefined } +attempt_action(Action, Files, + State = #state { pending_no_readers = Pending, + msg_store_state = MsgStoreState }) -> + case [File || File <- Files, + rabbit_msg_store:has_readers(File, MsgStoreState)] of + [] -> do_action(Action, Files, MsgStoreState), + State; + [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. + +do_action(combine, [Source, Destination], MsgStoreState) -> + rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); +do_action(delete, [File], MsgStoreState) -> + rabbit_msg_store:delete_file(File, MsgStoreState). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 435fdfac..f2a65eeb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -962,9 +962,6 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - {error, {invalid_scope, _}} = - control_action(set_permissions, ["guest", "foo", ".*", ".*"], - [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -987,9 +984,7 @@ test_user_management() -> ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "client"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "all"}]), + [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), @@ -1297,7 +1292,7 @@ info_action(Command, Args, CheckVHost) -> {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. -default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. +default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> lists:foldl(fun({K, _}=A, R) -> |