summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-26 13:31:00 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-26 13:31:00 +0000
commit08478a808204cf9033cc486edd91eaab6336568b (patch)
treefd4dcb4f08a27ba1e2d602c7b1f7f7a668cd9994
parent89213a4aab4609fdfdedcb181ad5c6fe4eb8e842 (diff)
downloadrabbitmq-server-08478a808204cf9033cc486edd91eaab6336568b.tar.gz
refactor 'write' handler for clarity
-rw-r--r--src/rabbit_msg_store.erl72
1 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 448ca47a..18227be1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -746,44 +746,19 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
noreply(
- case should_mask_action(CRef, Guid, State) of
- {true, Loc} ->
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, Loc, State1} ->
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State1),
State;
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize } = Loc} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client
- %% death message, but as it is being GC'd
- %% currently we'll have to write a new copy,
- %% which will then be younger, so ignore this
- %% write.
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- State;
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- State1 = adjust_valid_total_size(File, TotalSize, State),
- client_confirm_if_on_disk(CRef, Guid, File, State1)
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with
- %% concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- ok = maybe_remove_from_cache(Guid, Loc, Msg, State),
- client_confirm_if_on_disk(CRef, Guid, File, State)
+ {confirm, Loc, File, State1} ->
+ ok = maybe_remove_from_cache(Guid, Loc, Msg, State1),
+ client_confirm_if_on_disk(CRef, Guid, File, State1)
end);
handle_cast({remove, CRef, Guids}, State) ->
@@ -932,6 +907,35 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, Loc}, _Guid, State) ->
+ {ignore, Loc, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize } = Loc},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, Loc, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, Loc, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File } = Loc},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, Loc, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).