summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-06 15:08:51 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-06 15:08:51 +0000
commitf78d7d205df06f4d90cceca928349a7bdb90dec4 (patch)
tree40ec65cea4c3941ce66b98c0df6442b364c840f0
parent5d3b291386560f1d9629be9293f2d624779ef322 (diff)
downloadrabbitmq-server-f78d7d205df06f4d90cceca928349a7bdb90dec4.tar.gz
Reimplement or manually merge all past changes for 23329 onto new head of default. In general the code's a bit cleaner due to better apis within msg_store generally
-rw-r--r--src/rabbit_msg_store.erl158
-rw-r--r--src/rabbit_tests.erl4
2 files changed, 117 insertions, 45 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e8b4e8e2..a85de453 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,6 +81,7 @@
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
+ dying_clients_ets, %% tid of the dying clients table
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
@@ -361,6 +362,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
+ ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
@@ -580,6 +582,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ DyingClientsEts = ets:new(rabbit_msg_store_terminal, [set]),
+
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
State = #msstate { dir = Dir,
@@ -598,6 +602,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients_ets = DyingClientsEts,
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -643,6 +648,7 @@ prioritise_cast(Msg, _State) ->
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
+ {client_dying, _Pid} -> 7;
_ -> 0
end.
@@ -681,11 +687,19 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1).
+handle_cast({client_dying, CRef},
+ State = #msstate { dying_clients_ets = DyingClientsEts }) ->
+ true = ets:insert_new(DyingClientsEts, {CRef, const}),
+ write_message(CRef, <<>>, State);
+
handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
+ State = #msstate { client_refs = ClientRefs,
+ dying_clients_ets = DyingClientsEts }) ->
+ true = ets:delete(DyingClientsEts, CRef),
State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate {
- client_refs = sets:del_element(CRef, ClientRefs) });
+ ClientRefs1 = sets:del_element(CRef, ClientRefs),
+ noreply(remove_message(CRef, CRef,
+ State1 #msstate { client_refs = ClientRefs1 }));
handle_cast({write, CRef, Guid},
State = #msstate { sum_valid_data = SumValid,
@@ -705,23 +719,33 @@ handle_cast({write, CRef, Guid},
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
- case index_lookup(Guid, State1) of
- not_found ->
+ case should_mask_action(CRef, Guid, State1) of
+ {true, _Location} ->
+ noreply(State1);
+ {false, not_found} ->
write_message(Guid, Msg, State1);
- #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State1),
write_message(Guid, Msg, State1);
- [#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State1),
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older then the client death
+ %% message, but seeing as it's being GC'd
+ %% currently, we'll have to write a new copy,
+ %% which will then be younger, so ignore this
+ %% write.
+ noreply(State1);
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
[_] = ets:update_counter(
FileSummaryEts, File,
[{#file_summary.valid_total_size, TotalSize}]),
noreply(State1 #msstate {
sum_valid_data = SumValid + TotalSize })
end;
- #msg_location { ref_count = RefCount, file = File } ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
ok = index_update_ref_count(Guid, RefCount + 1, State1),
@@ -731,12 +755,12 @@ handle_cast({write, CRef, Guid},
CTG;
_ -> CTG1
end,
- noreply(State #msstate { cref_to_guids = CTG2 })
+ noreply(State1 #msstate { cref_to_guids = CTG2 })
end;
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, State2) end,
+ fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
noreply(maybe_compact(State2));
@@ -801,6 +825,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients_ets = DyingClientsEts,
client_refs = ClientRefs,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -814,8 +839,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts,
+ CurFileCacheEts, DyingClientsEts]],
IndexModule:terminate(IndexState),
store_recovery_terms([{client_refs, sets:to_list(ClientRefs)},
{index_module, IndexModule}], Dir),
@@ -990,34 +1015,48 @@ contains_message(Guid, From,
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } =
- index_lookup_positive_ref_count(Guid, State),
- %% only update field, otherwise bad interaction with concurrent GC
- Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, File, State);
- [#file_summary {}] ->
+remove_message(Guid, CRef,
+ State = #msstate { sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg
+ %% whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }} when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec =
+ fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here
+ %% because there may be further writes in the mailbox
+ %% for the same msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, Guid, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ [_] = ets:update_counter(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size,
+ -TotalSize}]),
+ delete_file_if_empty(
+ File, State #msstate {
+ sum_valid_data = SumValid - TotalSize })
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
ok = Dec(),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, -TotalSize}]),
- delete_file_if_empty(
- File, State #msstate {
- sum_valid_data = SumValid - TotalSize })
- end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
- State
+ State
+ end
end.
add_to_pending_gc_completion(
@@ -1039,8 +1078,8 @@ run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending_action({remove, Guid}, State) ->
- remove_message(Guid, State).
+run_pending_action({remove, Guid, CRef}, State) ->
+ remove_message(Guid, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1073,6 +1112,37 @@ client_confirm(CRef, Guids,
error -> State
end.
+%% Detect whether the Guid is older or younger than the client's death
+%% msg (if there is one). If the msg is older than the client death
+%% msg, and it has a 0 ref_count we must only alter the ref_count it,
+%% not rewrite the msg - rewriting it would make it younger than the
+%% death msg and thus should be ignored. Note that this will
+%% (correctly) return false when testing to remove the death msg
+%% itself.
+should_mask_action(CRef, Guid,
+ State = #msstate { dying_clients_ets = DyingClientsEts }) ->
+ Location = index_lookup(Guid, State),
+ {case ets:lookup(DyingClientsEts, CRef) of
+ [] ->
+ false;
+ [{_CRef, const}] ->
+ case Location of
+ not_found ->
+ true;
+ #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } ->
+ #msg_location { file = DeathFile, offset = DeathOffset } =
+ index_lookup(CRef, State),
+ case {DeathFile, DeathOffset} < {File, Offset} of
+ true -> true;
+ false -> case RefCount of
+ 0 -> false_if_increment;
+ _ -> false
+ end
+ end
+ end
+ end, Location}.
+
%%----------------------------------------------------------------------------
%% file helper functions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b9edad9a..8de5adbe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1664,7 +1664,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
- {A, B} =
+ {A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
@@ -1673,6 +1673,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:write(Guid, Guid, MSCState),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
end, {Qi, []}, SeqIds),
+ %% do this just to force all of the publishes through to the msg_store:
+ true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.