summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl55
1 files changed, 33 insertions, 22 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fc49b0e7..f8e12cb8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -759,34 +759,39 @@ handle_cast({write, CRef, MsgId},
noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
%% A 'remove' has already been issued and eliminated the
- %% 'write'. If all writes get eliminated,
- %% cur_file_cache_ets could grow unbounded. To prevent
- %% that we delete the cache entry here, but only if the
- %% message isn't in the current file. That way reads of
- %% the message can continue to be done client side, from
- %% either the cache or the non-current files. If the
- %% message *is* in the current file then the cache entry
- %% will be removed by the normal logic for that in
- %% write_message/4 and maybe_roll_to_new_file/2.
- case index_lookup(MsgId, State) of
+ %% 'write'.
+ State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
+ removed, State),
+ %% If all writes get eliminated, cur_file_cache_ets could
+ %% grow unbounded. To prevent that we delete the cache
+ %% entry here, but only if the message isn't in the
+ %% current file. That way reads of the message can
+ %% continue to be done client side, from either the cache
+ %% or the non-current files. If the message *is* in the
+ %% current file then the cache entry will be removed by
+ %% the normal logic for that in write_message/4 and
+ %% maybe_roll_to_new_file/2.
+ case index_lookup(MsgId, State1) of
[#msg_location { file = File }]
- when File == State #msstate.current_file ->
+ when File == State1 #msstate.current_file ->
ok;
_ ->
true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
end,
- noreply(State)
+ noreply(State1)
end;
handle_cast({remove, CRef, MsgIds}, State) ->
- State1 = lists:foldl(
- fun (MsgId, State2) ->
- case update_flying(+1, MsgId, CRef, State2) of
- process -> remove_message(MsgId, CRef, State2);
- ignore -> State2
- end
- end, State, MsgIds),
- noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
+ {RemovedMsgIds, State1} =
+ lists:foldl(
+ fun (MsgId, {Removed, State2}) ->
+ case update_flying(+1, MsgId, CRef, State2) of
+ process -> {[MsgId, Removed],
+ remove_message(MsgId, CRef, State2)};
+ ignore -> {Removed, State2}
+ end
+ end, {[], State}, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
removed, State1)));
handle_cast({combine_files, Source, Destination, Reclaimed},
@@ -1165,9 +1170,10 @@ record_pending_confirm(CRef, MsgId, State) ->
client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(MsgIds, ActionTaken),
case dict:find(CRef, CTM) of
- {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
+ ActionTaken),
+ MsgIds1 = gb_sets:difference(Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
true -> dict:erase(CRef, CTM);
false -> dict:store(CRef, MsgIds1, CTM)
@@ -1176,6 +1182,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
end
end, CRef, State).
+blind_confirm(CRef, MsgIds, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
+ CRef, State).
+
%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not