diff options
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 55 |
1 files changed, 33 insertions, 22 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fc49b0e7..f8e12cb8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -759,34 +759,39 @@ handle_cast({write, CRef, MsgId}, noreply(write_message(MsgId, Msg, CRef, State)); ignore -> %% A 'remove' has already been issued and eliminated the - %% 'write'. If all writes get eliminated, - %% cur_file_cache_ets could grow unbounded. To prevent - %% that we delete the cache entry here, but only if the - %% message isn't in the current file. That way reads of - %% the message can continue to be done client side, from - %% either the cache or the non-current files. If the - %% message *is* in the current file then the cache entry - %% will be removed by the normal logic for that in - %% write_message/4 and maybe_roll_to_new_file/2. - case index_lookup(MsgId, State) of + %% 'write'. + State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), + removed, State), + %% If all writes get eliminated, cur_file_cache_ets could + %% grow unbounded. To prevent that we delete the cache + %% entry here, but only if the message isn't in the + %% current file. That way reads of the message can + %% continue to be done client side, from either the cache + %% or the non-current files. If the message *is* in the + %% current file then the cache entry will be removed by + %% the normal logic for that in write_message/4 and + %% maybe_roll_to_new_file/2. + case index_lookup(MsgId, State1) of [#msg_location { file = File }] - when File == State #msstate.current_file -> + when File == State1 #msstate.current_file -> ok; _ -> true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0}) end, - noreply(State) + noreply(State1) end; handle_cast({remove, CRef, MsgIds}, State) -> - State1 = lists:foldl( - fun (MsgId, State2) -> - case update_flying(+1, MsgId, CRef, State2) of - process -> remove_message(MsgId, CRef, State2); - ignore -> State2 - end - end, State, MsgIds), - noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), + {RemovedMsgIds, State1} = + lists:foldl( + fun (MsgId, {Removed, State2}) -> + case update_flying(+1, MsgId, CRef, State2) of + process -> {[MsgId, Removed], + remove_message(MsgId, CRef, State2)}; + ignore -> {Removed, State2} + end + end, {[], State}, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), removed, State1))); handle_cast({combine_files, Source, Destination, Reclaimed}, @@ -1165,9 +1170,10 @@ record_pending_confirm(CRef, MsgId, State) -> client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(MsgIds, ActionTaken), case dict:find(CRef, CTM) of - {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds), + {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), + ActionTaken), + MsgIds1 = gb_sets:difference(Gs, MsgIds), case gb_sets:is_empty(MsgIds1) of true -> dict:erase(CRef, CTM); false -> dict:store(CRef, MsgIds1, CTM) @@ -1176,6 +1182,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> end end, CRef, State). +blind_confirm(CRef, MsgIds, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, + CRef, State). + %% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not |