diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-28 18:05:38 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-28 18:05:38 +0100 |
commit | 45adb5ec32c09f6cc8e105d55d86b9d07ddebb39 (patch) | |
tree | f0b3271873dbc4a0e0dac7127e9991a123460ef9 | |
parent | 46ca7002550b19f8bfe32586ca810c2b719f9261 (diff) | |
parent | 14d29d6bf7a9868465ef7219bc918ebab29b77e4 (diff) | |
download | rabbitmq-server-45adb5ec32c09f6cc8e105d55d86b9d07ddebb39.tar.gz |
merge bug24513 into default
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | packaging/common/rabbitmq-server.init | 6 | ||||
-rwxr-xr-x | quickcheck | 9 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 6 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 46 | ||||
-rw-r--r-- | src/rabbit_control.erl | 5 | ||||
-rw-r--r-- | src/rabbit_memory_monitor.erl | 20 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 263 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 47 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
11 files changed, 282 insertions, 125 deletions
@@ -368,3 +368,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" endif .PHONY: run-qc + diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index 15fd5d5b..c59af6c1 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -56,8 +56,10 @@ start_rabbitmq () { RETVAL=0 ensure_pid_dir set +e - setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \ - ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" & + RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \ + > "${INIT_LOG_DIR}/startup_log" \ + 2> "${INIT_LOG_DIR}/startup_err" \ + 0<&- & $CONTROL wait $PID_FILE >/dev/null 2>&1 RETVAL=$? set -e @@ -6,15 +6,16 @@ %% A helper to test quickcheck properties on a running broker %% NodeStr is a local broker node name %% ModStr is the module containing quickcheck properties -%% The number of trials is optional -main([NodeStr, ModStr | TrialsStr]) -> +%% TrialsStr is the number of trials +main([NodeStr, ModStr, TrialsStr]) -> {ok, Hostname} = inet:gethostname(), Node = list_to_atom(NodeStr ++ "@" ++ Hostname), Mod = list_to_atom(ModStr), - Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr), + Trials = erlang:list_to_integer(TrialsStr), case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> - case rpc:call(Node, proper, module, [Mod] ++ Trials) of + case rpc:call(Node, proper, module, + [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of [] -> ok; _ -> quit(1) end; diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 5e782a08..0900f56f 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -202,7 +202,7 @@ with_sups(Fun, Sups) -> Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups], Fun(Pids), [kill(Pid) || Pid <- Pids, is_process_alive(Pid)], - timer:sleep(100), + timer:sleep(500), passed. start_sup(Spec) -> diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d38ecb91..fd03ca85 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -45,11 +45,7 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> ok; - false -> rabbit_sup:start_restartable_child(vm_memory_monitor, - [MemoryWatermark]) - end, + rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), ok. stop() -> diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 095202dd..c61184a6 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -34,14 +34,15 @@ -export([initial_state/0, command/1, precondition/2, postcondition/3, next_state/3]). --export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). +-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]). -record(state, {bqstate, len, %% int next_seq_id, %% int messages, %% gb_trees of seqid => {msg_props, basic_msg} acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] - confirms}). %% set of msgid + confirms, %% set of msgid + publishing}).%% int %% Initialise model @@ -51,7 +52,8 @@ initial_state() -> next_seq_id = 0, messages = gb_trees:empty(), acks = [], - confirms = gb_sets:new()}. + confirms = gb_sets:new(), + publishing = 0}. %% Property @@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) -> expiry = oneof([undefined | lists:seq(1, 10)])}, self(), BQ]}. -qc_publish_multiple(#state{bqstate = BQ}) -> - {call, ?MODULE, publish_multiple, - [qc_message(), #message_properties{}, BQ, - resize(?QUEUE_MAXLEN, pos_integer())]}. +qc_publish_multiple(#state{}) -> + {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, @@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, requeue, - [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) -> %% Preconditions +%% Create long queues by only allowing publishing +precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg}) + when Count > 0, Fun /= publish -> + false; precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> length(Acks) > 0; @@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> #state{len = Len, messages = Messages, confirms = Confirms, + publishing = PublishCount, next_seq_id = NextSeq} = S, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, NeedsConfirm = @@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> len = Len + 1, next_seq_id = NextSeq + 1, messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), + publishing = {call, erlang, max, [0, {call, erlang, '-', + [PublishCount, 1]}]}, confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end}; -next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> - #state{len = Len, messages = Messages} = S, - {S1, Msgs1} = repeat({S, Messages}, - fun ({#state{next_seq_id = NextSeq} = State, Msgs}) -> - {State #state { next_seq_id = NextSeq + 1}, - gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)} - end, Count), - S1#state{bqstate = BQ, - len = Len + Count, - messages = Msgs1}; +next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> + S#state{publishing = PublishCount}; next_state(S, Res, {call, ?BQMOD, publish_delivered, @@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> S#state{bqstate = BQ1, acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; -next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> +next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) -> #state{messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, Messages1 = lists:foldl(fun (AckTag, Msgs) -> @@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> %% Helpers -repeat(Result, _Fun, 0) -> Result; -repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1). - -publish_multiple(Msg, MsgProps, BQ, Count) -> - repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end, - Count). +publish_multiple(_C) -> + ok. timeout(BQ, 0) -> BQ; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 905e4fd0..66a5ab5b 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -325,7 +325,10 @@ action(trace_off, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> - Frac = list_to_float(Arg), + Frac = list_to_float(case string:chr(Arg, $.) of + 0 -> Arg ++ ".0"; + _ -> Arg + end), Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 3deb9580..02f3158f 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -211,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations, queue_duration_sum = Sum, queue_duration_count = Count }) -> MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), - MemoryRatio = erlang:memory(total) / MemoryLimit, + MemoryRatio = case MemoryLimit > 0.0 of + true -> erlang:memory(total) / MemoryLimit; + false -> infinity + end, DesiredDurationAvg1 = - case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of - true -> + if MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 -> infinity; - false -> - Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of - true -> Sum + ?SUM_INC_AMOUNT; - false -> Sum - end, - (Sum1 / Count) / MemoryRatio + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio end, State1 = State #state { desired_duration = DesiredDurationAvg1 }, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d67c30a3..e6a32b90 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -68,6 +68,7 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table cur_file_cache_ets, %% tid of current file cache table + flying_ets, %% tid of writes/removes in flight dying_clients, %% set of dying clients clients, %% map of references of all registered clients %% to callbacks @@ -86,7 +87,8 @@ gc_pid, file_handles_ets, file_summary_ets, - cur_file_cache_ets + cur_file_cache_ets, + flying_ets }). -record(file_summary, @@ -128,12 +130,13 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid()}). + cur_file_cache_ets :: ets:tid(), + flying_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). -type(maybe_msg_id_fun() :: - 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())). + 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). @@ -375,6 +378,45 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% When the msg_store has a backlog (i.e. it has unprocessed messages +%% in its mailbox / gen_server priority queue), a further optimisation +%% opportunity arises: we can eliminate pairs of 'write' and 'remove' +%% from the same client for the same message. A typical occurrence of +%% these is when an empty durable queue delivers persistent messages +%% to ack'ing consumers. The queue will asynchronously ask the +%% msg_store to 'write' such messages, and when they are acknowledged +%% it will issue a 'remove'. That 'remove' may be issued before the +%% msg_store has processed the 'write'. There is then no point going +%% ahead with the processing of that 'write'. +%% +%% To detect this situation a 'flying_ets' table is shared between the +%% clients and the server. The table is keyed on the combination of +%% client (reference) and msg id, and the value represents an +%% integration of all the writes and removes currently "in flight" for +%% that message between the client and server - '+1' means all the +%% writes/removes add up to a single 'write', '-1' to a 'remove', and +%% '0' to nothing. (NB: the integration can never add up to more than +%% one 'write' or 'read' since clients must not write/remove a message +%% more than once without first removing/writing it). +%% +%% Maintaining this table poses two challenges: 1) both the clients +%% and the server access and update the table, which causes +%% concurrency issues, 2) we must ensure that entries do not stay in +%% the table forever, since that would constitute a memory leak. We +%% address the former by carefully modelling all operations as +%% sequences of atomic actions that produce valid results in all +%% possible interleavings. We address the latter by deleting table +%% entries whenever the server finds a 0-valued entry during the +%% processing of a write/remove. 0 is essentially equivalent to "no +%% entry". If, OTOH, the value is non-zero we know there is at least +%% one other 'write' or 'remove' in flight, so we get an opportunity +%% later to delete the table entry when processing these. +%% +%% There are two further complications. We need to ensure that 1) +%% eliminated writes still get confirmed, and 2) the write-back cache +%% doesn't grow unbounded. These are quite straightforward to +%% address. See the comments in the code. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -392,7 +434,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId}). @@ -440,6 +484,7 @@ read(MsgId, contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). set_maximum_since_use(Server, Age) -> @@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, end end. +client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, + client_ref = CRef }) -> + Key = {MsgId, CRef}, + case ets:insert_new(FlyingEts, {Key, Diff}) of + true -> ok; + false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) + catch error:badarg -> + %% this is guaranteed to succeed since the + %% server only removes and updates flying_ets + %% entries; it never inserts them + true = ets:insert_new(FlyingEts, {Key, Diff}) + end, + ok + end. + clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), @@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, dying_clients = sets:new(), clients = Clients, successfully_recovered = CleanShutdown, @@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, - CurFileCacheEts}, State #msstate { clients = Clients1 }); + CurFileCacheEts, FlyingEts}, + State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -723,40 +787,54 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> +handle_cast({client_delete, CRef}, + State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - State1; - {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + case update_flying(-1, MsgId, CRef, State) of + process -> + [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), + noreply(write_message(MsgId, Msg, CRef, State)); + ignore -> + %% A 'remove' has already been issued and eliminated the + %% 'write'. + State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), + ignored, State), + %% If all writes get eliminated, cur_file_cache_ets could + %% grow unbounded. To prevent that we delete the cache + %% entry here, but only if the message isn't in the + %% current file. That way reads of the message can + %% continue to be done client side, from either the cache + %% or the non-current files. If the message *is* in the + %% current file then the cache entry will be removed by + %% the normal logic for that in write_message/4 and + %% maybe_roll_to_new_file/2. + case index_lookup(MsgId, State1) of + [#msg_location { file = File }] + when File == State1 #msstate.current_file -> + ok; + _ -> + true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0}) + end, + noreply(State1) + end; handle_cast({remove, CRef, MsgIds}, State) -> - State1 = lists:foldl( - fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end, - State, MsgIds), - noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), - removed, State1))); + {RemovedMsgIds, State1} = + lists:foldl( + fun (MsgId, {Removed, State2}) -> + case update_flying(+1, MsgId, CRef, State2) of + process -> {[MsgId | Removed], + remove_message(MsgId, CRef, State2)}; + ignore -> {Removed, State2} + end + end, {[], State}, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), + ignored, State1))); handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, @@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), - [true = ets:delete(T) || - T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], + [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, + CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, client_confirm(CRef, MsgIds, written, StateN) end, State1, CGs). +update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> + Key = {MsgId, CRef}, + NDiff = -Diff, + case ets:lookup(FlyingEts, Key) of + [] -> ignore; + [{_, Diff}] -> ignore; + [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), + true = ets:delete_object(FlyingEts, {Key, 0}), + process; + [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), + ignore + end. + write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; write_action({true, #msg_location { file = File }}, _MsgId, State) -> @@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, MsgId, Msg, State) -> - write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). +write_message(MsgId, Msg, CRef, + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(MsgId, Msg, + record_pending_confirm(CRef, MsgId, State1)); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end. + +remove_message(MsgId, CRef, + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg whilst + %% it's being GC'd. + %% + %% ASSERTION: [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from cur_file_cache_ets here because + %% there may be further writes in the mailbox for the + %% same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end. write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, @@ -1004,43 +1153,6 @@ contains_message(MsgId, From, end end. -remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg - %% whilst it's being GC'd. ASSERTION: - %% [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> - index_update_ref_count(MsgId, RefCount - 1, State) - end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here - %% because there may be further writes in the mailbox - %% for the same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size(File, -TotalSize, - State)) - end; - _ -> ok = Dec(), - State - end - end. - add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = @@ -1120,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> end end, CRef, State). +blind_confirm(CRef, MsgIds, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, + CRef, State). + %% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fcfd5557..00d46f5a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1778,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], - {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), + {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), Ref = rabbit_guid:guid(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), @@ -1789,6 +1789,8 @@ test_msg_store() -> false = msg_store_contains(false, MsgIds, MSCState), %% test confirm logic passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -1896,12 +1898,12 @@ test_msg_store() -> false = msg_store_contains(false, MsgIdsBig, MSCStateM), MSCStateM end), + %% + passed = test_msg_store_client_delete_and_terminate(), %% restart empty restart_msg_store_empty(), passed. -%% We want to test that writes that get eliminated due to removes still -%% get confirmed. Removes themselves do not. test_msg_store_confirms(MsgIds, Cap, MSCState) -> %% write -> confirmed ok = msg_store_write(MsgIds, MSCState), @@ -1927,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> ok = msg_store_write(MsgIds, MSCState), ok = msg_store_remove(MsgIds, MSCState), ok = on_disk_await(Cap, MsgIds), + %% confirmation on timer-based sync + passed = test_msg_store_confirm_timer(), + passed. + +test_msg_store_confirm_timer() -> + Ref = rabbit_guid:guid(), + MsgId = msg_id_bin(1), + Self = self(), + MSCState = rabbit_msg_store:client_init( + ?PERSISTENT_MSG_STORE, Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), + ok = msg_store_write([MsgId], MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState), + ok = msg_store_remove([MsgId], MSCState), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + passed. + +msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> + receive + on_disk -> ok + after 0 -> + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + msg_store_keep_busy_until_confirm(MsgIds, MSCState) + end. + +test_msg_store_client_delete_and_terminate() -> + restart_msg_store_empty(), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], + Ref = rabbit_guid:guid(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + ok = msg_store_write(MsgIds, MSCState), + %% test the 'dying client' fast path for writes + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. queue_name(Name) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 45e852da..63a0927f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1290,7 +1290,7 @@ blind_confirm(Callback, MsgIdSet) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). -msgs_written_to_disk(Callback, MsgIdSet, removed) -> +msgs_written_to_disk(Callback, MsgIdSet, ignored) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> Callback(?MODULE, |