summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-16 18:37:23 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-16 18:37:23 +0000
commitb687b817842e55c8eb09c2ac1ac777241fb596f6 (patch)
treee700639cba6d991f3e567b0d8131d4102e01358e
parent0df6298f0ee79c5e91d45cb50f79b8b4c5cf6282 (diff)
downloadrabbitmq-server-b687b817842e55c8eb09c2ac1ac777241fb596f6.tar.gz
some cosmetic and minor refactoring for clarity
-rw-r--r--src/rabbit_msg_store.erl47
1 files changed, 25 insertions, 22 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 5dd01483..4028f5d2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -763,18 +763,17 @@ handle_cast({write, CRef, Guid},
cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- State1 = add_cref_to_guids_if_callback(CRef, Guid, State),
case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
noreply(State);
{false, not_found} ->
- write_message(Guid, Msg, State1);
+ write_message(CRef, Guid, Msg, State);
{Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }} ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State1),
- write_message(Guid, Msg, State1);
+ ok = index_delete(Guid, State),
+ write_message(CRef, Guid, Msg, State);
{false_if_increment, [#file_summary { locked = true }]} ->
%% The msg for Guid is older than the client death
%% message, but as it is being GC'd currently,
@@ -783,8 +782,9 @@ handle_cast({write, CRef, Guid},
noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State2))
+ noreply(client_confirm_if_on_disk(
+ CRef, Guid, File,
+ adjust_valid_total_size(File, TotalSize, State)))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
@@ -939,6 +939,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_message(CRef, Guid, Msg, State) ->
+ write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
+
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
@@ -1139,21 +1142,7 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients,
State #msstate { cref_to_guids = CTG1 }
end.
-client_confirm(CRef, Guids, ActionTaken, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(Guids, ActionTaken),
- case dict:find(CRef, CTG) of
- {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
- end;
- error -> CTG
- end
- end, CRef, State).
-
-add_cref_to_guids_if_callback(CRef, Guid, State) ->
+record_pending_confirm(CRef, Guid, State) ->
update_pending_confirms(
fun (_MsgOnDiskFun, CTG) ->
dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
@@ -1162,7 +1151,7 @@ add_cref_to_guids_if_callback(CRef, Guid, State) ->
client_confirm_if_on_disk(CRef, Guid, CurFile,
State = #msstate { current_file = CurFile }) ->
- add_cref_to_guids_if_callback(CRef, Guid, State);
+ record_pending_confirm(CRef, Guid, State);
client_confirm_if_on_disk(CRef, Guid, _File, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->
@@ -1170,6 +1159,20 @@ client_confirm_if_on_disk(CRef, Guid, _File, State) ->
CTG
end, CRef, State).
+client_confirm(CRef, Guids, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(Guids, ActionTaken),
+ case dict:find(CRef, CTG) of
+ {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end
+ end, CRef, State).
+
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not