diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-26 13:31:00 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-26 13:31:00 +0000 |
commit | 08478a808204cf9033cc486edd91eaab6336568b (patch) | |
tree | fd4dcb4f08a27ba1e2d602c7b1f7f7a668cd9994 | |
parent | 89213a4aab4609fdfdedcb181ad5c6fe4eb8e842 (diff) | |
download | rabbitmq-server-08478a808204cf9033cc486edd91eaab6336568b.tar.gz |
refactor 'write' handler for clarity
-rw-r--r-- | src/rabbit_msg_store.erl | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 448ca47a..18227be1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -746,44 +746,19 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), noreply( - case should_mask_action(CRef, Guid, State) of - {true, Loc} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, Loc, State1} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), State; - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize } = Loc} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client - %% death message, but as it is being GC'd - %% currently we'll have to write a new copy, - %% which will then be younger, so ignore this - %% write. - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State; - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State1 = adjust_valid_total_size(File, TotalSize, State), - client_confirm_if_on_disk(CRef, Guid, File, State1) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with - %% concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - client_confirm_if_on_disk(CRef, Guid, File, State) + {confirm, Loc, File, State1} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), + client_confirm_if_on_disk(CRef, Guid, File, State1) end); handle_cast({remove, CRef, Guids}, State) -> @@ -932,6 +907,35 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, Loc}, _Guid, State) -> + {ignore, Loc, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize } = Loc}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, Loc, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, Loc, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File } = Loc}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, Loc, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). |