diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-26 12:37:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-26 12:37:55 +0000 |
commit | 89213a4aab4609fdfdedcb181ad5c6fe4eb8e842 (patch) | |
tree | 70979ba247d28f3796c6048ba40d735a64022040 | |
parent | 6570ed681706498d09cf4783ce24727ffe5a7f0d (diff) | |
download | rabbitmq-server-89213a4aab4609fdfdedcb181ad5c6fe4eb8e842.tar.gz |
return State from write_message rather than it calling noreply
This makes the API more consistent (cf read_message etc).
Also, shuffle order of some write actions for consistency
-rw-r--r-- | src/rabbit_msg_store.erl | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index ac4cda99..448ca47a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,7 +738,8 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, @@ -749,38 +750,41 @@ handle_cast({write, CRef, Guid}, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, Loc} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize } = Loc} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case should_mask_action(CRef, Guid, State) of + {true, Loc} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State; + {false, not_found} -> + write_message(CRef, Guid, Msg, State); + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize } = Loc} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + write_message(CRef, Guid, Msg, State); + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client + %% death message, but as it is being GC'd + %% currently we'll have to write a new copy, + %% which will then be younger, so ignore this + %% write. + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + client_confirm_if_on_disk(CRef, Guid, File, State1) + end; + {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> + %% We already know about it, just update counter. Only + %% update field otherwise bad interaction with + %% concurrent GC + ok = index_update_ref_count(Guid, RefCount + 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + client_confirm_if_on_disk(CRef, Guid, File, State) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -947,11 +951,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> |