diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-25 19:00:36 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-25 19:00:36 +0000 |
commit | 6570ed681706498d09cf4783ce24727ffe5a7f0d (patch) | |
tree | 8819e588e1574a9a6a8c07ad83423e883f7a7fcb | |
parent | 75726260fd78b409b206d92d83125bfed1d6dd5e (diff) | |
download | rabbitmq-server-6570ed681706498d09cf4783ce24727ffe5a7f0d.tar.gz |
If we don't actually write the msg and the msg isn't in the current file then delete it if its pending write count is 0
-rw-r--r-- | src/rabbit_msg_store.erl | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e07..ac4cda99 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -750,12 +750,13 @@ handle_cast({write, CRef, Guid}, true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case should_mask_action(CRef, Guid, State) of - {true, _Location} -> + {true, Loc} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), noreply(State); {false, not_found} -> write_message(CRef, Guid, Msg, State); {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> + total_size = TotalSize } = Loc} -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State), @@ -765,15 +766,18 @@ handle_cast({write, CRef, Guid}, %% message, but as it is being GC'd currently, %% we'll have to write a new copy, which will then %% be younger, so ignore this write. + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), noreply(State); {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), State1 = client_confirm_if_on_disk(CRef, Guid, File, State), noreply(adjust_valid_total_size(File, TotalSize, State1)) end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> + {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), ok = index_update_ref_count(Guid, RefCount + 1, State), noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) end; @@ -1108,6 +1112,14 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +maybe_remove_from_cache(_Guid, #msg_location { file = CurFile }, _Msg, + #msstate { current_file = CurFile }) -> + ok; +maybe_remove_from_cache(Guid, _Location, Msg, + #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + ok. + adjust_valid_total_size(File, Delta, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts }) -> |