diff options
author | Michael Klishin <mklishin@pivotal.io> | 2019-05-10 02:55:16 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-10 02:55:16 +0300 |
commit | 5c80bea709f4c89db3d8652f4a3e7d8421efb76e (patch) | |
tree | f110123ac7c5966633a104fad72e5b44de71286a | |
parent | 43dfd5021b0fb012316ec6e9ad388f2cae14ef02 (diff) | |
parent | a8deb7500646b2b73d15cedbe4d3ea4d3dd07b2b (diff) | |
download | rabbitmq-server-git-3.8.0-beta.4.tar.gz |
Merge pull request #2001 from rabbitmq/rabbitmq-server-2000v3.8.0-beta.4
Move check for active readers to message store GC action function
-rw-r--r-- | src/rabbit_msg_store.erl | 73 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 16 | ||||
-rw-r--r-- | test/queue_parallel_SUITE.erl | 27 |
3 files changed, 80 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1ec5fe9e1a..e3b23cfbca 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,7 +23,7 @@ client_ref/1, close_all_indicated/1, write/3, write_flow/3, read/2, contains/2, remove/2]). --export([set_maximum_since_use/2, has_readers/2, combine_files/3, +-export([set_maximum_since_use/2, combine_files/3, delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade @@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File, %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- --spec has_readers(non_neg_integer(), gc_state()) -> boolean(). - -has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> - [#file_summary { locked = true, readers = Count }] = - ets:lookup(FileSummaryEts, File), - Count /= 0. - -spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - deletion_thunk(). + {ok, deletion_thunk()} | {defer, non_neg_integer()}. combine_files(Source, Destination, - State = #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir, - msg_store = Server }) -> - [#file_summary { + State = #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary{locked = true} = SourceSummary] = + ets:lookup(FileSummaryEts, Source), + + [#file_summary{locked = true} = DestinationSummary] = + ets:lookup(FileSummaryEts, Destination), + + case {SourceSummary, DestinationSummary} of + {#file_summary{readers = 0}, #file_summary{readers = 0}} -> + {ok, do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, State)}; + _ -> + rabbit_log:debug("Asked to combine files ~p and ~p but they have active readers. Deferring.", + [Source, Destination]), + DeferredFiles = [FileSummary#file_summary.file + || FileSummary <- [SourceSummary, DestinationSummary], + FileSummary#file_summary.readers /= 0], + {defer, DeferredFiles} + end. + +do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir, + msg_store = Server }) -> + #file_summary { readers = 0, left = Destination, valid_total_size = SourceValid, file_size = SourceFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Source), - [#file_summary { + locked = true } = SourceSummary, + #file_summary { readers = 0, right = Source, valid_total_size = DestinationValid, file_size = DestinationFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Destination), + locked = true } = DestinationSummary, SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -2053,22 +2068,30 @@ combine_files(Source, Destination, {#file_summary.file_size, TotalValidData}]), Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + rabbit_log:debug("Combined segment files number ~p (source) and ~p (destination), reclaimed ~p bytes", + [Source, Destination, Reclaimed]), gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). --spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk(). +-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}. delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> - [#file_summary { valid_total_size = 0, - locked = true, - file_size = FileSize, - readers = 0 }] = ets:lookup(FileSummaryEts, File), - {[], 0} = load_and_vacuum_message_file(File, State), - gen_server2:cast(Server, {delete_file, File, FileSize}), - safe_file_delete_fun(File, Dir, FileHandlesEts). + case ets:lookup(FileSummaryEts, File) of + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] -> + {[], 0} = load_and_vacuum_message_file(File, State), + gen_server2:cast(Server, {delete_file, File, FileSize}), + {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)}; + [#file_summary{readers = Readers}] when Readers > 0 -> + rabbit_log:debug("Asked to delete file ~p but it has active readers. Deferring.", + [File]), + {defer, [File]} + end. load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) -> %% Messages here will be end-of-file at start-of-list diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index dfadd5586d..60702b5b95 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -119,15 +119,13 @@ attempt_action(Action, Files, State = #state { pending_no_readers = Pending, on_action = Thunks, msg_store_state = MsgStoreState }) -> - case [File || File <- Files, - rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> State #state { - on_action = lists:filter( - fun (Thunk) -> not Thunk() end, - [do_action(Action, Files, MsgStoreState) | - Thunks]) }; - [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending), - State #state { pending_no_readers = Pending1 } + case do_action(Action, Files, MsgStoreState) of + {ok, OkThunk} -> + State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end, + [OkThunk | Thunks])}; + {defer, [File | _]} -> + Pending1 = maps:put(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. do_action(combine, [Source, Destination], MsgStoreState) -> diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index 632a314d21..f8a039dc65 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -60,8 +60,10 @@ groups() -> [ {parallel_tests, [], [ - {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, - {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds, + trigger_message_store_compaction]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds, + trigger_message_store_compaction]}, {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} @@ -327,6 +329,27 @@ subscribe_and_multiple_ack(Config) -> rabbit_ct_client_helpers:close_channel(Ch), ok. +trigger_message_store_compaction(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + N = 12000, + [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)], + wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]), + + AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N), + ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags), + + [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag, + multiple = false}) || Tag <- ToAck], + + %% give compaction a moment to start in and finish + timer:sleep(5000), + amqp_channel:cast(Ch, #'queue.purge'{queue = QName}), + rabbit_ct_client_helpers:close_channel(Ch), + ok. + subscribe_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), |