summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-10 18:26:18 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-10 18:26:18 +0100
commit0eca27507a30da56461c85fa60ec02eb8851c9e6 (patch)
tree23791892bcf8f2a0e904a7f32402b129ad8ee1cd /src/rabbit_msg_store.erl
parentbc4f80d231c91481cb690424d6e2a43b7eb2ac60 (diff)
downloadrabbitmq-server-0eca27507a30da56461c85fa60ec02eb8851c9e6.tar.gz
there's more than one call site for remove_message...
...and only one of them must pay attention to flying_ets
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl82
1 files changed, 41 insertions, 41 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index edb1d8b1..1c2603fa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -803,8 +803,12 @@ handle_cast({write, CRef, MsgId},
handle_cast({remove, CRef, MsgIds}, State) ->
State1 = lists:foldl(
- fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
- State, MsgIds),
+ fun (MsgId, State2) ->
+ case update_flying(+1, MsgId, CRef, State2) of
+ process -> remove_message(MsgId, CRef, State2);
+ ignore -> State2
+ end
+ end, State, MsgIds),
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
@@ -1069,45 +1073,41 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case update_flying(+1, MsgId, CRef, State) of
- process ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this
- %% msg whilst it's being GC'd. ASSERTION:
- %% [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }}
- when RefCount > 0 ->
- %% only update field, otherwise bad interaction
- %% with concurrent GC
- Dec = fun () -> index_update_ref_count(
- MsgId, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME
- %% here because there may be further writes in
- %% the mailbox for the same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(
- File, -TotalSize, State))
- end;
- _ -> ok = Dec(),
- State
- end
- end;
- ignore ->
- State
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
end.
add_to_pending_gc_completion(