summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-26 12:36:41 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-26 12:36:41 +0100
commit4c41d35a0c0c80c7d58790a7cb8bcfa0a22bbee1 (patch)
treed63779e33ab24285524e0236214b8871832b90fb
parentdbea323eaf57a5a4408e1fb237e6e381d3df79bd (diff)
parent2041124fd94b1011d3f968e52bb8da81918ee33d (diff)
downloadrabbitmq-server-4c41d35a0c0c80c7d58790a7cb8bcfa0a22bbee1.tar.gz
Merging bug 23429 into default
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_access_control.erl75
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_msg_store.erl359
-rw-r--r--src/rabbit_msg_store_gc.erl96
-rw-r--r--src/rabbit_tests.erl9
7 files changed, 283 insertions, 276 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3b7244c7..acb99bc8 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -589,7 +589,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -597,16 +597,6 @@
<listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
</varlistentry>
<varlistentry>
- <term>scope</term>
- <listitem><para>Scope of the permissions: either
- <command>client</command> (the default) or
- <command>all</command>. This determines whether
- permissions are checked for server-generated resource
- names (<command>all</command>) or only for
- client-specified resource names
- (<command>client</command>).</para></listitem>
- </varlistentry>
- <varlistentry>
<term>user</term>
<listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
</varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index ae672fc9..c1c9bd65 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,7 +30,7 @@
%%
-record(user, {username, password, is_admin}).
--record(permission, {scope, configure, write, read}).
+-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 73fd6f0e..85452abf 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
--export([set_permissions/5, set_permissions/6, clear_permissions/2,
+-export([set_permissions/5, clear_permissions/2,
list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
list_user_vhost_permissions/2]).
@@ -52,9 +52,6 @@
-type(username() :: binary()).
-type(password() :: binary()).
-type(regexp() :: binary()).
--type(scope() :: binary()).
--type(scope_atom() :: 'client' | 'all').
-
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
rabbit_types:channel_exit()).
@@ -82,21 +79,15 @@
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
--spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
- regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_permissions/0 ::
- () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
-spec(list_user_permissions/1 ::
- (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-spec(list_user_vhost_permissions/2 ::
- (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(),
- scope_atom()}]).
+ (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]).
-endif.
@@ -188,20 +179,15 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
- case {Name, P} of
- {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
- true;
- _ ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
- end
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
end
end,
if Res -> ok;
@@ -334,7 +320,7 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _, _}) ->
+ lists:foreach(fun ({Username, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
list_vhost_permissions(VHostPath)),
@@ -355,16 +341,7 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
- set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm,
- WritePerm, ReadPerm).
-
-set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
- Scope = case ScopeBin of
- <<"client">> -> client;
- <<"all">> -> all;
- _ -> throw({error, {invalid_scope, ScopeBin}})
- end,
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -374,7 +351,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer
username = Username,
virtual_host = VHostPath},
permission = #permission{
- scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
@@ -393,35 +369,34 @@ clear_permissions(Username, VHostPath) ->
end)).
list_permissions() ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(match_user_vhost('_', '_'))].
list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_vhost(
VHostPath, match_user_vhost('_', VHostPath)))].
list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
list_user_vhost_permissions(Username, VHostPath) ->
- [{ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ [{ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm} <-
list_permissions(rabbit_misc:with_user_and_vhost(
Username, VHostPath,
match_user_vhost(Username, VHostPath)))].
list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
#user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
- permission = #permission{ scope = Scope,
- configure = ConfigurePerm,
+ permission = #permission{ configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}} <-
%% TODO: use dirty ops instead
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8facaf16..6b212745 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -39,7 +39,6 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
--define(SCOPE_OPT, "-s").
%%----------------------------------------------------------------------------
@@ -67,7 +66,7 @@ start() ->
{[Command0 | Args], Opts} =
rabbit_misc:get_options(
[{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ {option, ?VHOST_OPT, "/"}],
FullCommand),
Opts1 = lists:map(fun({K, V}) ->
case K of
@@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Scope, Username, VHost, CPerm, WPerm, RPerm]});
+ [Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 66cc06cf..b9f3e1a3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -37,7 +37,8 @@
client_init/2, client_terminate/2, client_delete_and_terminate/3,
write/4, read/3, contains/2, remove/2, release/2, sync/3]).
--export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
+-export([sync/1, set_maximum_since_use/2,
+ has_readers/2, combine_files/3, delete_file/2]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -74,7 +75,6 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_active, %% is the GC currently working?
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
@@ -100,10 +100,27 @@
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
+-record(gc_state,
+ { dir,
+ index_module,
+ index_state,
+ file_summary_ets,
+ msg_store
+ }).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([gc_state/0, file_num/0]).
+
+-opaque(gc_state() :: #gc_state { dir :: file:filename(),
+ index_module :: atom(),
+ index_state :: any(),
+ file_summary_ets :: ets:tid(),
+ msg_store :: server()
+ }).
+
-type(server() :: pid() | atom()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate {
@@ -138,12 +155,11 @@
-spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
-spec(sync/1 :: (server()) -> 'ok').
--spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
- 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {ets:tid(), file:filename(), atom(), any()}) ->
- 'concurrent_readers' | non_neg_integer()).
+-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
+-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
+ non_neg_integer()).
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()).
-endif.
@@ -375,9 +391,6 @@ sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
-gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
-
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -482,9 +495,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
read_from_disk(MsgLocation, CState1, DedupCacheEts),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
- MsgLocation -> %% different file!
+ #msg_location {} = MsgLocation -> %% different file!
Release(), %% this MUST NOT fail with badarg
- client_read1(Server, MsgLocation, Defer, CState)
+ client_read1(Server, MsgLocation, Defer, CState);
+ not_found -> %% it seems not to exist. Defer, just to be sure.
+ try Release() %% this can badarg, same as locked case, above
+ catch error:badarg -> ok
+ end,
+ Defer()
end
end.
@@ -547,8 +565,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
- pending_gc_completion = [],
- gc_active = false,
+ pending_gc_completion = orddict:new(),
gc_pid = undefined,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -570,8 +587,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, Offset} = file_handle_cache:position(CurHdl, Offset),
ok = file_handle_cache:truncate(CurHdl),
- {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
- FileSummaryEts),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(
+ #gc_state { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts,
+ msg_store = self()
+ }),
{ok, maybe_compact(
State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
@@ -588,10 +610,11 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- sync -> 8;
- {gc_done, _Reclaimed, _Source, _Destination} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- _ -> 0
+ sync -> 8;
+ {combine_files, _Source, _Destination, _Reclaimed} -> 8;
+ {delete_file, _File, _Reclaimed} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ _ -> 0
end.
handle_call(successfully_recovered_state, _From, State) ->
@@ -686,37 +709,23 @@ handle_cast({sync, Guids, K},
handle_cast(sync, State) ->
noreply(internal_sync(State));
-handle_cast({gc_done, Reclaimed, Src, Dst},
+handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Src, Dst},
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
- %% GC done, so now ensure that any clients that have open fhs to
- %% those files close them before using them again. This has to be
- %% done here (given it's done in the msg_store, and not the gc),
- %% and not when starting up the GC, because if done when starting
- %% up the GC, the client could find the close, and close and
- %% reopen the fh, whilst the GC is waiting for readers to
- %% disappear, before it's actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, Src),
- true = mark_handle_to_close(FileHandlesEts, Dst),
- %% we always move data left, so Src has gone and was on the
- %% right, so need to make dest = source.right.left, and also
- %% dest.right = source.right
- [#file_summary { left = Dst,
- right = SrcRight,
- locked = true,
- readers = 0 }] = ets:lookup(FileSummaryEts, Src),
- %% this could fail if SrcRight =:= undefined
- ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
- true = ets:update_element(FileSummaryEts, Dst,
- [{#file_summary.locked, false},
- {#file_summary.right, SrcRight}]),
- true = ets:delete(FileSummaryEts, Src),
- noreply(
- maybe_compact(run_pending(
- State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false })));
+ ok = cleanup_after_file_deletion(Source, State),
+ %% see comment in cleanup_after_file_deletion
+ true = mark_handle_to_close(FileHandlesEts, Destination),
+ true = ets:update_element(FileSummaryEts, Destination,
+ {#file_summary.locked, false}),
+ State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
+ noreply(maybe_compact(run_pending([Source, Destination], State1)));
+
+handle_cast({delete_file, File, Reclaimed},
+ State = #msstate { sum_file_size = SumFileSize }) ->
+ ok = cleanup_after_file_deletion(File, State),
+ State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
+ noreply(maybe_compact(run_pending([File], State1)));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -867,7 +876,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
ets:lookup(FileSummaryEts, File),
case Locked of
true -> add_to_pending_gc_completion({read, Guid, From},
- State);
+ File, State);
false -> {Msg, State1} =
read_from_disk(MsgLoc, State, DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
@@ -897,19 +906,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{Msg, State1}.
-contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
+contains_message(Guid, From,
+ State = #msstate { pending_gc_completion = Pending }) ->
case index_lookup_positive_ref_count(Guid, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
- case GCActive of
- {A, B} when File =:= A orelse File =:= B ->
- add_to_pending_gc_completion(
- {contains, Guid, From}, State);
- _ ->
- gen_server2:reply(From, true),
- State
+ case orddict:is_key(File, Pending) of
+ true -> add_to_pending_gc_completion(
+ {contains, Guid, From}, File, State);
+ false -> gen_server2:reply(From, true),
+ State
end
end.
@@ -928,7 +936,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, State);
+ add_to_pending_gc_completion({remove, Guid}, File, State);
[#file_summary {}] ->
ok = Dec(),
[_] = ets:update_counter(
@@ -944,20 +952,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
end.
add_to_pending_gc_completion(
- Op, State = #msstate { pending_gc_completion = Pending }) ->
- State #msstate { pending_gc_completion = [Op | Pending] }.
-
-run_pending(State = #msstate { pending_gc_completion = [] }) ->
- State;
-run_pending(State = #msstate { pending_gc_completion = Pending }) ->
- State1 = State #msstate { pending_gc_completion = [] },
- lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
+ Op, File, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion =
+ rabbit_misc:orddict_cons(File, Op, Pending) }.
-run_pending({read, Guid, From}, State) ->
+run_pending(Files, State) ->
+ lists:foldl(
+ fun (File, State1 = #msstate { pending_gc_completion = Pending }) ->
+ Pending1 = orddict:erase(File, Pending),
+ lists:foldl(
+ fun run_pending_action/2,
+ State1 #msstate { pending_gc_completion = Pending1 },
+ lists:reverse(orddict:fetch(File, Pending)))
+ end, State, Files).
+
+run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
-run_pending({contains, Guid, From}, State) ->
+run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending({remove, Guid}, State) ->
+run_pending_action({remove, Guid}, State) ->
remove_message(Guid, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
@@ -969,6 +982,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+orddict_store(Key, Val, Dict) ->
+ false = orddict:is_key(Key, Dict),
+ orddict:store(Key, Val, Dict).
+
%%----------------------------------------------------------------------------
%% file helper functions
%%----------------------------------------------------------------------------
@@ -1429,12 +1446,12 @@ maybe_roll_to_new_file(
maybe_roll_to_new_file(_, State) ->
State.
-maybe_compact(State = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- gc_active = false,
- gc_pid = GCPid,
- file_summary_ets = FileSummaryEts,
- file_size_limit = FileSizeLimit })
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_pid = GCPid,
+ pending_gc_completion = Pending,
+ file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit })
when (SumFileSize > 2 * FileSizeLimit andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
%% TODO: the algorithm here is sub-optimal - it may result in a
@@ -1443,27 +1460,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
'$end_of_table' ->
State;
First ->
- case find_files_to_gc(FileSummaryEts, FileSizeLimit,
- ets:lookup(FileSummaryEts, First)) of
+ case find_files_to_combine(FileSummaryEts, FileSizeLimit,
+ ets:lookup(FileSummaryEts, First)) of
not_found ->
State;
{Src, Dst} ->
+ Pending1 = orddict_store(Dst, [],
+ orddict_store(Src, [], Pending)),
State1 = close_handle(Src, close_handle(Dst, State)),
true = ets:update_element(FileSummaryEts, Src,
{#file_summary.locked, true}),
true = ets:update_element(FileSummaryEts, Dst,
{#file_summary.locked, true}),
- ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst),
- State1 #msstate { gc_active = {Src, Dst} }
+ ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
+ State1 #msstate { pending_gc_completion = Pending1 }
end
end;
maybe_compact(State) ->
State.
-find_files_to_gc(FileSummaryEts, FileSizeLimit,
- [#file_summary { file = Dst,
- valid_total_size = DstValid,
- right = Src }]) ->
+find_files_to_combine(FileSummaryEts, FileSizeLimit,
+ [#file_summary { file = Dst,
+ valid_total_size = DstValid,
+ right = Src,
+ locked = DstLocked }]) ->
case Src of
undefined ->
not_found;
@@ -1471,13 +1491,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
[#file_summary { file = Src,
valid_total_size = SrcValid,
left = Dst,
- right = SrcRight }] = Next =
+ right = SrcRight,
+ locked = SrcLocked }] = Next =
ets:lookup(FileSummaryEts, Src),
case SrcRight of
undefined -> not_found;
- _ -> case DstValid + SrcValid =< FileSizeLimit of
+ _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso
+ (DstValid > 0) andalso (SrcValid > 0) andalso
+ not (DstLocked orelse SrcLocked) of
true -> {Src, Dst};
- false -> find_files_to_gc(
+ false -> find_files_to_combine(
FileSummaryEts, FileSizeLimit, Next)
end
end
@@ -1486,85 +1509,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State = #msstate {
- dir = Dir,
- sum_file_size = SumFileSize,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ gc_pid = GCPid,
+ file_summary_ets = FileSummaryEts,
+ pending_gc_completion = Pending }) ->
[#file_summary { valid_total_size = ValidData,
- left = Left,
- right = Right,
- file_size = FileSize,
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- %% we should NEVER find the current file in here hence right
- %% should always be a file, not undefined
- 0 -> case {Left, Right} of
- {undefined, _} when Right =/= undefined ->
- %% the eldest file is empty.
- true = ets:update_element(
- FileSummaryEts, Right,
- {#file_summary.left, undefined});
- {_, _} when Right =/= undefined ->
- true = ets:update_element(FileSummaryEts, Right,
- {#file_summary.left, Left}),
- true = ets:update_element(FileSummaryEts, Left,
- {#file_summary.right, Right})
- end,
- true = mark_handle_to_close(FileHandlesEts, File),
- true = ets:delete(FileSummaryEts, File),
- {ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- [index_delete(Guid, State) ||
- {Guid, _TotalSize, _Offset} <- Messages],
- State1 = close_handle(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- State1 #msstate { sum_file_size = SumFileSize - FileSize };
+ 0 -> %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ true = ets:update_element(FileSummaryEts, File,
+ {#file_summary.locked, true}),
+ ok = rabbit_msg_store_gc:delete(GCPid, File),
+ Pending1 = orddict_store(File, [], Pending),
+ close_handle(File,
+ State #msstate { pending_gc_completion = Pending1 });
_ -> State
end.
+cleanup_after_file_deletion(File,
+ #msstate { file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts }) ->
+ %% Ensure that any clients that have open fhs to the file close
+ %% them before using them again. This has to be done here (given
+ %% it's done in the msg_store, and not the gc), and not when
+ %% starting up the GC, because if done when starting up the GC,
+ %% the client could find the close, and close and reopen the fh,
+ %% whilst the GC is waiting for readers to disappear, before it's
+ %% actually done the GC.
+ true = mark_handle_to_close(FileHandlesEts, File),
+ [#file_summary { left = Left,
+ right = Right,
+ locked = true,
+ readers = 0 }] = ets:lookup(FileSummaryEts, File),
+ %% We'll never delete the current file, so right is never undefined
+ true = Right =/= undefined, %% ASSERTION
+ true = ets:update_element(FileSummaryEts, Right,
+ {#file_summary.left, Left}),
+ %% ensure the double linked list is maintained
+ true = case Left of
+ undefined -> true; %% File is the eldest file (left-most)
+ _ -> ets:update_element(FileSummaryEts, Left,
+ {#file_summary.right, Right})
+ end,
+ true = ets:delete(FileSummaryEts, File),
+ ok.
+
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
- [SrcObj = #file_summary {
- readers = SrcReaders,
- left = DstFile,
- file_size = SrcFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, SrcFile),
- [DstObj = #file_summary {
- readers = DstReaders,
- right = SrcFile,
- file_size = DstFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, DstFile),
-
- case SrcReaders =:= 0 andalso DstReaders =:= 0 of
- true -> TotalValidData = combine_files(SrcObj, DstObj, State),
- %% don't update dest.right, because it could be
- %% changing at the same time
- true = ets:update_element(
- FileSummaryEts, DstFile,
- [{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.file_size, TotalValidData}]),
- SrcFileSize + DstFileSize - TotalValidData;
- false -> concurrent_readers
- end.
-
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- right = Source },
- State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
- SourceName = filenum_to_name(Source),
- DestinationName = filenum_to_name(Destination),
+has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
+ [#file_summary { locked = true, readers = Count }] =
+ ets:lookup(FileSummaryEts, File),
+ Count /= 0.
+
+combine_files(Source, Destination,
+ State = #gc_state { file_summary_ets = FileSummaryEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ [#file_summary {
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
+ [#file_summary {
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
+
+ SourceName = filenum_to_name(Source),
+ DestinationName = filenum_to_name(Destination),
{ok, SourceHdl} = open_file(Dir, SourceName,
?READ_AHEAD_MODE),
{ok, DestinationHdl} = open_file(Dir, DestinationName,
?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
+ TotalValidData = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't
%% need a tmp file
%% if they're not equal, then we need to write out everything past
@@ -1577,7 +1601,7 @@ combine_files(#file_summary { file = Source,
drop_contiguous_block_prefix(DestinationWorkList),
case DestinationWorkListTail of
[] -> ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
+ DestinationHdl, DestinationContiguousTop, TotalValidData);
_ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
ok = copy_messages(
@@ -1591,7 +1615,7 @@ combine_files(#file_summary { file = Source,
%% Destination and copy from Tmp back to the end
{ok, 0} = file_handle_cache:position(TmpHdl, 0),
ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ DestinationHdl, DestinationContiguousTop, TotalValidData),
{ok, TmpSize} =
file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be DestinationValid
@@ -1599,14 +1623,36 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(TmpHdl)
end,
{SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
ok = file_handle_cache:close(DestinationHdl),
ok = file_handle_cache:delete(SourceHdl),
- ExpectedSize.
-load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
+ %% don't update dest.right, because it could be changing at the
+ %% same time
+ true = ets:update_element(
+ FileSummaryEts, Destination,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+
+ Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
+ gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}).
+
+delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ file_size = FileSize,
+ readers = 0 }] = ets:lookup(FileSummaryEts, File),
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))),
+ gen_server2:cast(Server, {delete_file, File, FileSize}).
+
+load_and_vacuum_message_file(File, #gc_state { dir = Dir,
+ index_module = Index,
+ index_state = IndexState }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -1627,7 +1673,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ Destination, #gc_state { index_module = Index,
+ index_state = IndexState }) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index a7855bbf..cd9fd497 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,20 +33,16 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, no_readers/2, stop/1]).
+-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
--record(gcstate,
- {dir,
- index_state,
- index_module,
- parent,
- file_summary_ets,
- scheduled
+-record(state,
+ { pending_no_readers,
+ msg_store_state
}).
-include("rabbit.hrl").
@@ -55,10 +51,12 @@
-ifdef(use_specs).
--spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) ->
+-spec(start_link/1 :: (rabbit_msg_store:gc_state()) ->
rabbit_types:ok_pid_or_error()).
--spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
--spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(),
+ rabbit_msg_store:file_num()) -> 'ok').
+-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok').
+-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok').
-spec(stop/1 :: (pid()) -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -66,13 +64,15 @@
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
- gen_server2:start_link(
- ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts],
- [{timeout, infinity}]).
+start_link(MsgStoreState) ->
+ gen_server2:start_link(?MODULE, [MsgStoreState],
+ [{timeout, infinity}]).
-gc(Server, Source, Destination) ->
- gen_server2:cast(Server, {gc, Source, Destination}).
+combine(Server, Source, Destination) ->
+ gen_server2:cast(Server, {combine, Source, Destination}).
+
+delete(Server, File) ->
+ gen_server2:cast(Server, {delete, File}).
no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).
@@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) ->
%%----------------------------------------------------------------------------
-init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
+init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = undefined },
- hibernate,
+ {ok, #state { pending_no_readers = dict:new(),
+ msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
@@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination},
- State = #gcstate { scheduled = undefined }) ->
- {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }),
- hibernate};
+handle_cast({combine, Source, Destination}, State) ->
+ {noreply, attempt_action(combine, [Source, Destination], State), hibernate};
-handle_cast({no_readers, File},
- State = #gcstate { scheduled = {Source, Destination} })
- when File =:= Source orelse File =:= Destination ->
- {noreply, attempt_gc(State), hibernate};
+handle_cast({delete, File}, State) ->
+ {noreply, attempt_action(delete, [File], State), hibernate};
-handle_cast({no_readers, _File}, State) ->
- {noreply, State, hibernate};
+handle_cast({no_readers, File},
+ State = #state { pending_no_readers = Pending }) ->
+ {noreply, case dict:find(File, Pending) of
+ error ->
+ State;
+ {ok, {Action, Files}} ->
+ Pending1 = dict:erase(File, Pending),
+ attempt_action(
+ Action, Files,
+ State #state { pending_no_readers = Pending1 })
+ end, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -129,16 +129,18 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-attempt_gc(State = #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = {Source, Destination} }) ->
- case rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}) of
- concurrent_readers -> State;
- Reclaimed -> ok = rabbit_msg_store:gc_done(
- Parent, Reclaimed, Source, Destination),
- State #gcstate { scheduled = undefined }
+attempt_action(Action, Files,
+ State = #state { pending_no_readers = Pending,
+ msg_store_state = MsgStoreState }) ->
+ case [File || File <- Files,
+ rabbit_msg_store:has_readers(File, MsgStoreState)] of
+ [] -> do_action(Action, Files, MsgStoreState),
+ State;
+ [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
+ State #state { pending_no_readers = Pending1 }
end.
+
+do_action(combine, [Source, Destination], MsgStoreState) ->
+ rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
+do_action(delete, [File], MsgStoreState) ->
+ rabbit_msg_store:delete_file(File, MsgStoreState).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 435fdfac..f2a65eeb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -962,9 +962,6 @@ test_user_management() ->
control_action(list_permissions, [], [{"-p", "/testhost"}]),
{error, {invalid_regexp, _, _}} =
control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
- {error, {invalid_scope, _}} =
- control_action(set_permissions, ["guest", "foo", ".*", ".*"],
- [{"-s", "cilent"}]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
@@ -987,9 +984,7 @@ test_user_management() ->
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
[{"-p", "/testhost"}]),
ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
- [{"-p", "/testhost"}, {"-s", "client"}]),
- ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
- [{"-p", "/testhost"}, {"-s", "all"}]),
+ [{"-p", "/testhost"}]),
ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_user_permissions, ["foo"]),
@@ -1297,7 +1292,7 @@ info_action(Command, Args, CheckVHost) ->
{bad_argument, dummy} = control_action(Command, ["dummy"]),
ok.
-default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}].
+default_options() -> [{"-p", "/"}, {"-q", "false"}].
expand_options(As, Bs) ->
lists:foldl(fun({K, _}=A, R) ->