summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-05-10 02:55:16 +0300
committerGitHub <noreply@github.com>2019-05-10 02:55:16 +0300
commit5c80bea709f4c89db3d8652f4a3e7d8421efb76e (patch)
treef110123ac7c5966633a104fad72e5b44de71286a
parent43dfd5021b0fb012316ec6e9ad388f2cae14ef02 (diff)
parenta8deb7500646b2b73d15cedbe4d3ea4d3dd07b2b (diff)
downloadrabbitmq-server-git-3.8.0-beta.4.tar.gz
Merge pull request #2001 from rabbitmq/rabbitmq-server-2000v3.8.0-beta.4
Move check for active readers to message store GC action function
-rw-r--r--src/rabbit_msg_store.erl73
-rw-r--r--src/rabbit_msg_store_gc.erl16
-rw-r--r--test/queue_parallel_SUITE.erl27
3 files changed, 80 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1ec5fe9e1a..e3b23cfbca 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -23,7 +23,7 @@
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
--export([set_maximum_since_use/2, has_readers/2, combine_files/3,
+-export([set_maximum_since_use/2, combine_files/3,
delete_file/2]). %% internal
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File,
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
--spec has_readers(non_neg_integer(), gc_state()) -> boolean().
-
-has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
- [#file_summary { locked = true, readers = Count }] =
- ets:lookup(FileSummaryEts, File),
- Count /= 0.
-
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
- deletion_thunk().
+ {ok, deletion_thunk()} | {defer, non_neg_integer()}.
combine_files(Source, Destination,
- State = #gc_state { file_summary_ets = FileSummaryEts,
- file_handles_ets = FileHandlesEts,
- dir = Dir,
- msg_store = Server }) ->
- [#file_summary {
+ State = #gc_state { file_summary_ets = FileSummaryEts }) ->
+ [#file_summary{locked = true} = SourceSummary] =
+ ets:lookup(FileSummaryEts, Source),
+
+ [#file_summary{locked = true} = DestinationSummary] =
+ ets:lookup(FileSummaryEts, Destination),
+
+ case {SourceSummary, DestinationSummary} of
+ {#file_summary{readers = 0}, #file_summary{readers = 0}} ->
+ {ok, do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination, State)};
+ _ ->
+ rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.",
+ [Source, Destination]),
+ DeferredFiles = [FileSummary#file_summary.file
+ || FileSummary <- [SourceSummary, DestinationSummary],
+ FileSummary#file_summary.readers /= 0],
+ {defer, DeferredFiles}
+ end.
+
+do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination,
+ State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ #file_summary {
readers = 0,
left = Destination,
valid_total_size = SourceValid,
file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
- [#file_summary {
+ locked = true } = SourceSummary,
+ #file_summary {
readers = 0,
right = Source,
valid_total_size = DestinationValid,
file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ locked = true } = DestinationSummary,
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -2053,22 +2068,30 @@ combine_files(Source, Destination,
{#file_summary.file_size, TotalValidData}]),
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
+ rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes",
+ [Source, Destination, Reclaimed]),
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
--spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
+-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
- [#file_summary { valid_total_size = 0,
- locked = true,
- file_size = FileSize,
- readers = 0 }] = ets:lookup(FileSummaryEts, File),
- {[], 0} = load_and_vacuum_message_file(File, State),
- gen_server2:cast(Server, {delete_file, File, FileSize}),
- safe_file_delete_fun(File, Dir, FileHandlesEts).
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ file_size = FileSize,
+ readers = 0 }] ->
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ gen_server2:cast(Server, {delete_file, File, FileSize}),
+ {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
+ [#file_summary{readers = Readers}] when Readers > 0 ->
+ rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.",
+ [File]),
+ {defer, [File]}
+ end.
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index dfadd5586d..60702b5b95 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -119,15 +119,13 @@ attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
msg_store_state = MsgStoreState }) ->
- case [File || File <- Files,
- rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> State #state {
- on_action = lists:filter(
- fun (Thunk) -> not Thunk() end,
- [do_action(Action, Files, MsgStoreState) |
- Thunks]) };
- [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
- State #state { pending_no_readers = Pending1 }
+ case do_action(Action, Files, MsgStoreState) of
+ {ok, OkThunk} ->
+ State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
+ [OkThunk | Thunks])};
+ {defer, [File | _]} ->
+ Pending1 = maps:put(File, {Action, Files}, Pending),
+ State #state { pending_no_readers = Pending1 }
end.
do_action(combine, [Source, Destination], MsgStoreState) ->
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index 632a314d21..f8a039dc65 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -60,8 +60,10 @@ groups() ->
[
{parallel_tests, [],
[
- {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
- {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
+ trigger_message_store_compaction]},
{quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
{quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
@@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
ok.
+trigger_message_store_compaction(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ N = 12000,
+ [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
+ wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),
+
+ AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
+ ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),
+
+ [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
+ multiple = false}) || Tag <- ToAck],
+
+ %% give compaction a moment to start in and finish
+ timer:sleep(5000),
+ amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),