summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-25 19:00:36 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-25 19:00:36 +0000
commit6570ed681706498d09cf4783ce24727ffe5a7f0d (patch)
tree8819e588e1574a9a6a8c07ad83423e883f7a7fcb
parent75726260fd78b409b206d92d83125bfed1d6dd5e (diff)
downloadrabbitmq-server-6570ed681706498d09cf4783ce24727ffe5a7f0d.tar.gz
If we don't actually write the msg and the msg isn't in the current file then delete it if its pending write count is 0
-rw-r--r--src/rabbit_msg_store.erl18
1 files changed, 15 insertions, 3 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e07..ac4cda99 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -750,12 +750,13 @@ handle_cast({write, CRef, Guid},
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
+ {true, Loc} ->
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
noreply(State);
{false, not_found} ->
write_message(CRef, Guid, Msg, State);
{Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
+ total_size = TotalSize } = Loc} ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State),
@@ -765,15 +766,18 @@ handle_cast({write, CRef, Guid},
%% message, but as it is being GC'd currently,
%% we'll have to write a new copy, which will then
%% be younger, so ignore this write.
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
noreply(adjust_valid_total_size(File, TotalSize, State1))
end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
ok = index_update_ref_count(Guid, RefCount + 1, State),
noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
@@ -1108,6 +1112,14 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+maybe_remove_from_cache(_Guid, #msg_location { file = CurFile }, _Msg,
+ #msstate { current_file = CurFile }) ->
+ ok;
+maybe_remove_from_cache(Guid, _Location, Msg,
+ #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ ok.
+
adjust_valid_total_size(File, Delta, State = #msstate {
sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts }) ->