diff options
authorMatthias Radestock <>2011-01-26 12:37:55 +0000
committerMatthias Radestock <>2011-01-26 12:37:55 +0000
commit89213a4aab4609fdfdedcb181ad5c6fe4eb8e842 (patch)
parent6570ed681706498d09cf4783ce24727ffe5a7f0d (diff)
return State from write_message rather than it calling noreply
This makes the API more consistent (cf read_message etc). Also, shuffle order of some write actions for consistency
1 files changed, 41 insertions, 38 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index ac4cda99..448ca47a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,7 +738,8 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
@@ -749,38 +750,41 @@ handle_cast({write, CRef, Guid},
cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, Loc} ->
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize } = Loc} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case should_mask_action(CRef, Guid, State) of
+ {true, Loc} ->
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
+ State;
+ {false, not_found} ->
+ write_message(CRef, Guid, Msg, State);
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize } = Loc} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ write_message(CRef, Guid, Msg, State);
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client
+ %% death message, but as it is being GC'd
+ %% currently we'll have to write a new copy,
+ %% which will then be younger, so ignore this
+ %% write.
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
+ State;
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ client_confirm_if_on_disk(CRef, Guid, File, State1)
+ end;
+ {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} ->
+ %% We already know about it, just update counter. Only
+ %% update field otherwise bad interaction with
+ %% concurrent GC
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
+ client_confirm_if_on_disk(CRef, Guid, File, State)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -947,11 +951,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->