summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-12 13:11:25 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-12 13:11:25 +0000
commitac2d11d526600ea98d940f3c94990727d2472bce (patch)
tree8cb537bb8ace0e943c6fbcb2e939f79cf46f2c97
parent883f064599382f440949ae73cc71671eeced7451 (diff)
downloadrabbitmq-server-ac2d11d526600ea98d940f3c94990727d2472bce.tar.gz
msg_store confirms messages that are written to disk, removed and then revived
-rw-r--r--src/rabbit_msg_store.erl33
1 files changed, 22 insertions, 11 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index deb62eb2..87c700e7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -761,7 +761,6 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, Guid},
State = #msstate { file_summary_ets = FileSummaryEts,
- current_file = CurFile,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
@@ -795,20 +794,15 @@ handle_cast({write, CRef, Guid},
noreply(State1);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- noreply(adjust_valid_total_size(File, TotalSize, State))
+ State2 =
+ client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State1),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
- written),
- CTG;
- _ -> CTG1
- end,
- noreply(State1 #msstate { cref_to_guids = CTG2 })
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
handle_cast({remove, CRef, Guids}, State) ->
@@ -1166,6 +1160,23 @@ client_confirm(CRef, Guids, ActionTaken,
error -> State
end.
+client_confirm_if_on_disk(CRef, Guid, File,
+ State = #msstate { client_ondisk_callback = CODC,
+ current_file = CurFile,
+ cref_to_guids = CTG }) ->
+ CTG1 = dict:update(CRef, fun(Guids) ->
+ gb_sets:add(Guid, Guids)
+ end,
+ gb_sets:singleton(Guid), CTG),
+ CTG2 = case {dict:find(CRef, CODC), File} of
+ {{ok, _}, CurFile} -> CTG1;
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
+ written),
+ CTG;
+ _ -> CTG
+ end,
+ State #msstate { cref_to_guids = CTG2 }.
+
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not