diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-16 18:37:23 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-16 18:37:23 +0000 |
commit | b687b817842e55c8eb09c2ac1ac777241fb596f6 (patch) | |
tree | e700639cba6d991f3e567b0d8131d4102e01358e | |
parent | 0df6298f0ee79c5e91d45cb50f79b8b4c5cf6282 (diff) | |
download | rabbitmq-server-b687b817842e55c8eb09c2ac1ac777241fb596f6.tar.gz |
some cosmetic and minor refactoring for clarity
-rw-r--r-- | src/rabbit_msg_store.erl | 47 |
1 files changed, 25 insertions, 22 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5dd01483..4028f5d2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -763,18 +763,17 @@ handle_cast({write, CRef, Guid}, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - State1 = add_cref_to_guids_if_callback(CRef, Guid, State), case should_mask_action(CRef, Guid, State) of {true, _Location} -> noreply(State); {false, not_found} -> - write_message(Guid, Msg, State1); + write_message(CRef, Guid, Msg, State); {Mask, #msg_location { ref_count = 0, file = File, total_size = TotalSize }} -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State1), - write_message(Guid, Msg, State1); + ok = index_delete(Guid, State), + write_message(CRef, Guid, Msg, State); {false_if_increment, [#file_summary { locked = true }]} -> %% The msg for Guid is older than the client death %% message, but as it is being GC'd currently, @@ -783,8 +782,9 @@ handle_cast({write, CRef, Guid}, noreply(State); {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), - State2 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State2)) + noreply(client_confirm_if_on_disk( + CRef, Guid, File, + adjust_valid_total_size(File, TotalSize, State))) end; {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only @@ -939,6 +939,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_message(CRef, Guid, Msg, State) -> + write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, @@ -1139,21 +1142,7 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, State #msstate { cref_to_guids = CTG1 } end. -client_confirm(CRef, Guids, ActionTaken, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(Guids, ActionTaken), - case dict:find(CRef, CTG) of - {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), - case gb_sets:is_empty(Guids1) of - true -> dict:erase(CRef, CTG); - false -> dict:store(CRef, Guids1, CTG) - end; - error -> CTG - end - end, CRef, State). - -add_cref_to_guids_if_callback(CRef, Guid, State) -> +record_pending_confirm(CRef, Guid, State) -> update_pending_confirms( fun (_MsgOnDiskFun, CTG) -> dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end, @@ -1162,7 +1151,7 @@ add_cref_to_guids_if_callback(CRef, Guid, State) -> client_confirm_if_on_disk(CRef, Guid, CurFile, State = #msstate { current_file = CurFile }) -> - add_cref_to_guids_if_callback(CRef, Guid, State); + record_pending_confirm(CRef, Guid, State); client_confirm_if_on_disk(CRef, Guid, _File, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> @@ -1170,6 +1159,20 @@ client_confirm_if_on_disk(CRef, Guid, _File, State) -> CTG end, CRef, State). +client_confirm(CRef, Guids, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(Guids, ActionTaken), + case dict:find(CRef, CTG) of + {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end + end, CRef, State). + %% Detect whether the Guid is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not |