summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2011-04-01 12:31:09 +0100
committerDavid Wragg <david@rabbitmq.com>2011-04-01 12:31:09 +0100
commite4e468b10e86523b95ae9705df21af31f59fb56e (patch)
tree2a139cc3498304a985849daba6abd5da4ccb6a5c
parent26b7b740905b3c15559c81cc0e945890ae8ec4d4 (diff)
parent0152526718c71f31b010fb58064aefa969e03143 (diff)
downloadrabbitmq-server-e4e468b10e86523b95ae9705df21af31f59fb56e.tar.gz
Merge bug24007 into default
'rabbitmqctl wait' can wait forever on broker failure
-rw-r--r--src/rabbit_msg_store.erl144
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl7
3 files changed, 34 insertions, 119 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bb26de64..34c793ec 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, release/2, sync/3]).
+ write/3, read/2, contains/2, remove/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
@@ -67,7 +67,6 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
@@ -87,7 +86,6 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
}).
@@ -130,7 +128,6 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
@@ -153,7 +150,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
@@ -396,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -429,27 +424,16 @@ write(MsgId, Msg,
ok = server_cast(CState, {write, CRef, MsgId}).
read(MsgId,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, MsgId}), CState}
- end,
- case index_lookup_positive_ref_count(MsgId, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{MsgId, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
@@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-release([], _CState) -> ok;
-release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
@@ -517,7 +499,6 @@ client_read2(false, _Right,
client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
@@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
@@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({release, MsgIds}, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
- lists:foreach(
- fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
- noreply(State);
-
handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
[true = ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -978,26 +948,18 @@ write_message(MsgId, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(MsgId, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(MsgId, From, State) ->
case index_lookup_positive_ref_count(MsgId, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
@@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
+ read_from_disk(MsgLoc, State);
[{MsgId, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
case Locked of
true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
contains_message(MsgId, From,
@@ -1068,8 +1024,7 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef,
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
{remove, MsgId, CRef}, File, State);
@@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef,
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, MsgId),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
- ok.
-
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
@@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) ->
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
- [] ->
- not_found;
- [{_MsgId, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, MsgId, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, MsgId) ->
- true = safe_ets_update_counter(
- DedupCacheEts, MsgId, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
- (_N) -> true
- end,
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
@@ -2007,7 +1926,10 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, MsgNew} = case binary_to_term(BinMsg) of
+ <<>> -> {ok, <<>>}; %% dying client marker
+ Msg -> TransformFun(Msg)
+ end,
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fb1c9a34..294fae97 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1786,8 +1786,6 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
%% read the second half again
MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
- %% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a1102e5..ff7252fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -728,7 +728,7 @@ requeue(AckTags, MsgPropsFun, State) ->
needs_confirming = false }
end,
a(reduce_memory_use(
- ack(fun msg_store_release/3,
+ ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
@@ -972,11 +972,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-
msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,