summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-19 10:42:25 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-19 10:42:25 +0000
commitc9dc04c1ac3e2eb78c2c6bd4e83c7fb4f60950b5 (patch)
treeaa7ee399fe602794dcb444d8796c992284058c03
parentcfa09e5fd38baa616b01c002e7e9e34bdc437b56 (diff)
parent7f6191345de6b256b1fce92626b4df43fc6bad7e (diff)
downloadrabbitmq-server-c9dc04c1ac3e2eb78c2c6bd4e83c7fb4f60950b5.tar.gz
merge from default
-rw-r--r--docs/rabbitmq-server.1.xml2
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_binding.erl13
-rw-r--r--src/rabbit_channel.erl161
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_msg_store.erl289
-rw-r--r--src/rabbit_msg_store_gc.erl10
-rw-r--r--src/rabbit_tests.erl54
-rw-r--r--src/rabbit_variable_queue.erl32
10 files changed, 341 insertions, 247 deletions
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 03e76c79..687a9c39 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -21,7 +21,7 @@
<refsynopsisdiv>
<cmdsynopsis>
- <command>rabbitmq-multi</command>
+ <command>rabbitmq-server</command>
<arg choice="opt">-detached</arg>
</cmdsynopsis>
</refsynopsisdiv>
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9626e126..7ba5a17f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
maybe_run_queue_via_backing_queue_async/2,
- update_ram_duration/1, set_ram_duration_target/2,
+ sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
@@ -69,6 +69,8 @@
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
+
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -147,7 +149,7 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> rabbit_types:amqqueue() | 'not_found').
+ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
@@ -155,6 +157,7 @@
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -463,6 +466,9 @@ maybe_run_queue_via_backing_queue(QPid, Fun) ->
maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+sync_timeout(QPid) ->
+ gen_server2:cast(QPid, sync_timeout).
+
update_ram_duration(QPid) ->
gen_server2:cast(QPid, update_ram_duration).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 38b83117..fa425fa8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -224,11 +224,9 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
+ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL,
- rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -797,6 +795,7 @@ prioritise_cast(Msg, _State) ->
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ sync_timeout -> 6;
_ -> 0
end.
@@ -1017,6 +1016,11 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast(sync_timeout, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS),
+ sync_timer_ref = undefined});
+
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 74fd00b7..9742c4b6 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -59,17 +59,18 @@
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
+-type(add_res() :: bind_res() | rabbit_misc:const(bind_res())).
+-type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')).
+-type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())).
-opaque(deletions() :: dict()).
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
--spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
--spec(remove/1 :: (rabbit_types:binding()) ->
- bind_res() | rabbit_types:error('binding_not_found')).
--spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
--spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
- bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/1 :: (rabbit_types:binding()) -> add_res()).
+-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_source/1 ::
(rabbit_types:binding_source()) -> bindings()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 78ecb774..ca424335 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -188,7 +188,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty()},
+ unconfirmed = gb_trees:empty(),
+ confirmed = []},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -204,8 +205,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- emit_stats -> 7;
- _ -> 0
+ emit_stats -> 7;
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ _ -> 0
end.
handle_call(flush, _From, State) ->
@@ -280,30 +282,27 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply,
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
- hibernate};
+ noreply([ensure_stats_timer],
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_cast({confirm, MsgSeqNos, From},
- State= #ch{stats_timer = StatsTimer}) ->
- case rabbit_event:stats_level(StatsTimer) of
- fine ->
- {noreply, group_and_confirm(MsgSeqNos, From, State), hibernate};
- _ ->
- {noreply, nogroup_confirm(MsgSeqNos, From, State), hibernate}
- end.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
+ noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+
+handle_info(timeout, State) ->
+ noreply(State);
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {EMs, UC1} = remove_queue_unconfirmed(
+ {MEs, UC1} = remove_queue_unconfirmed(
gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
- State1 = confirm_grouped(EMs, State#ch{unconfirmed = UC1}),
+ {[], UC}, State),
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State1), hibernate}.
+ noreply(
+ queue_blocked(QPid, record_confirms(MEs, State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -333,11 +332,24 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) ->
- {reply, Reply, ensure_stats_timer(NewState), hibernate}.
+reply(Reply, NewState) -> reply(Reply, [], NewState).
+
+reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
+
+reply(Reply, Mask, NewState, Timeout) ->
+ {reply, Reply, next_state(Mask, NewState), Timeout}.
+
+noreply(NewState) -> noreply([], NewState).
+
+noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
+
+noreply(Mask, NewState, Timeout) ->
+ {noreply, next_state(Mask, NewState), Timeout}.
-noreply(NewState) ->
- {noreply, ensure_stats_timer(NewState), hibernate}.
+next_state(Mask, State) ->
+ lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1);
+ (send_confirms, State1) -> send_confirms(State1)
+ end, State, [ensure_stats_timer, send_confirms] -- Mask).
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
@@ -476,61 +488,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QX, Acc) ->
+remove_queue_unconfirmed(none, _QX, Acc, _State) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc, State) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, QX, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, QX, Acc, State),
+ State).
-group_and_confirm([], _QPid, State) ->
+record_confirm(undefined, _, State) ->
State;
-group_and_confirm(MsgSeqNos, QPid, State) ->
- {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State),
- confirm_grouped(EMs, State#ch{unconfirmed=UC1}).
-
-confirm_grouped(EMs, State) ->
- case lists:usort(EMs) of
- [{XName, MsgSeqNo} | EMs1] ->
- lists:foldl(
- fun({XName1, MsgSeqNosE}, State0) ->
- send_confirms(MsgSeqNosE, XName1, State0)
- end, State,
- group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]));
- [] ->
- State
- end.
-
-group_confirms_by_exchange([], Acc) ->
- Acc;
-group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) ->
- group_confirms_by_exchange(EMs, [{E, [Msg1 | Msgs]} | Acc]);
-group_confirms_by_exchange([{E, Msg1} | EMs], Acc) ->
- group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]).
+record_confirm(MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State).
-nogroup_confirm([], _QPid, State) ->
+record_confirms([], State) ->
State;
-nogroup_confirm(MsgSeqNos, QPid, State) ->
- {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State),
- DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs],
- send_confirms(DoneMessages, undefined, State#ch{unconfirmed = UC1}).
+record_confirms(MEs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MEs | C]}.
-take_from_unconfirmed(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, {_, XName} = QX} ->
- maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- remove_qmsg(MsgSeqNo, QPid, QX, Acc)
- end
- end, {[], UC}, MsgSeqNos).
-
-remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) ->
- %% remove QPid from MsgSeqNo's mapping
+confirm([], _QPid, State) ->
+ State;
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MEs, UC1} =
+ lists:foldl(
+ fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UC0) of
+ none -> Acc;
+ {value,QX} -> remove_qmsg(MsgSeqNo, QPid, QX, Acc, State)
+ end
+ end, {[], UC}, MsgSeqNos),
+ record_confirms(MEs, State#ch{unconfirmed = UC1}).
+
+remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MEs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
- 0 -> {[{XName, MsgSeqNo} | XMs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {XMs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MEs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MEs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1252,12 +1245,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State#ch.writer_pid, no_route),
- send_confirms([MsgSeqNo], XName, State);
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
- send_confirms([MsgSeqNo], XName, State);
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- send_confirms([MsgSeqNo], XName, State);
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
@@ -1271,15 +1264,19 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-send_confirms([], _, State) ->
+send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) ->
+ MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of
+ fine -> incr_confirm_exchange_stats(C, State);
+ _ -> [MsgSeqNo || {MsgSeqNo, _} <- C]
+ end,
+ send_confirms(MsgSeqNos, State #ch{confirmed = []}).
+send_confirms([], State) ->
State;
-send_confirms([MsgSeqNo], XName,
+send_confirms([MsgSeqNo],
State = #ch{writer_pid = WriterPid}) ->
send_confirm(MsgSeqNo, WriterPid),
- maybe_incr_stats([{XName, 1}], confirm, State),
State;
-send_confirms(Cs, XName,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
@@ -1293,11 +1290,15 @@ send_confirms(Cs, XName,
multiple = true})
end,
[ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
- maybe_incr_stats([{XName, length(Cs)}], confirm, State),
State.
-send_confirm(undefined, _WriterPid) ->
- ok;
+incr_confirm_exchange_stats(C, State) ->
+ lists:foldl(
+ fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) ->
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ [MsgSeqNo | MsgSeqNos0]
+ end, [], lists:append(C)).
+
send_confirm(SeqNo, WriterPid) ->
ok = rabbit_writer:send_command(WriterPid,
#'basic.ack'{delivery_tag = SeqNo}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9e8ba91b..14f03a77 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -75,10 +75,11 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, const/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
+-type(const(T) :: fun((any()) -> T)).
-type(resource_name() :: binary()).
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
@@ -204,7 +205,7 @@
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/1 :: (any()) -> 'ok').
--spec(const/1 :: (A) -> fun ((_) -> A)).
+-spec(const/1 :: (A) -> const(A)).
-endif.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f8b41ed3..80e319dd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,8 +34,8 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/3, client_terminate/1, client_delete_and_terminate/1,
- client_ref/1,
+ client_init/4, client_terminate/1, client_delete_and_terminate/1,
+ client_ref/1, close_all_indicated/1,
write/3, read/2, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
@@ -82,10 +82,10 @@
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
- client_refs, %% set of references of all registered clients
+ clients, %% map of references of all registered clients
+ %% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- client_ondisk_callback, %% client ref to callback function mapping
cref_to_guids %% client ref to synced messages mapping
}).
@@ -111,6 +111,7 @@
index_module,
index_state,
file_summary_ets,
+ file_handles_ets,
msg_store
}).
@@ -124,6 +125,7 @@
index_module :: atom(),
index_state :: any(),
file_summary_ets :: ets:tid(),
+ file_handles_ets :: ets:tid(),
msg_store :: server()
}).
@@ -146,13 +148,15 @@
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
+-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
+-type(deletion_thunk() :: fun (() -> boolean())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) ->
- client_msstate()).
+-spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(),
+ maybe_close_fds_fun()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
@@ -169,8 +173,8 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
- 'ok').
--spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok').
+ deletion_thunk()).
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
-endif.
@@ -380,9 +384,9 @@
%%
%% We use a separate set to keep track of the dying clients in order
%% to keep that set, which is inspected on every write and remove, as
-%% small as possible. Inspecting client_refs - the set of all clients
-%% - would degrade performance with many healthy clients and few, if
-%% any, dying clients, which is the typical case.
+%% small as possible. Inspecting the set of all clients would degrade
+%% performance with many healthy clients and few, if any, dying
+%% clients, which is the typical case.
%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -399,11 +403,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref, MsgOnDiskFun) ->
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
- infinity),
+ gen_server2:call(
+ Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -523,7 +527,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- gc_pid = GCPid }) ->
+ gc_pid = GCPid,
+ client_ref = Ref }) ->
Release =
fun() -> ok = case ets:update_counter(FileSummaryEts, File,
{#file_summary.readers, -1}) of
@@ -570,9 +575,14 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
case index_lookup(Guid, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
- mark_handle_open(FileHandlesEts, File),
-
- CState1 = close_all_indicated(CState),
+ {ok, CState1} = close_all_indicated(CState),
+ %% We are now guaranteed that the mark_handle_open
+ %% call will either insert_new correctly, or will
+ %% fail, but find the value is open, not close.
+ mark_handle_open(FileHandlesEts, File, Ref),
+ %% Could the msg_store now mark the file to be
+ %% closed? No: marks for closing are issued only
+ %% when the msg_store has locked the file.
{Msg, CState2} = %% This will never be the current file
read_from_disk(MsgLocation, CState1, DedupCacheEts),
Release(), %% this MUST NOT fail with badarg
@@ -588,11 +598,10 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
-clear_client_callback(CRef,
- State = #msstate { client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
- State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
- cref_to_guids = dict:erase(CRef, CTG)}.
+clear_client(CRef, State = #msstate { cref_to_guids = CTG,
+ dying_clients = DyingClients }) ->
+ State #msstate { cref_to_guids = dict:erase(CRef, CTG),
+ dying_clients = sets:del_element(CRef, DyingClients) }.
%%----------------------------------------------------------------------------
@@ -628,6 +637,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
+ Clients = dict:from_list(
+ [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -661,10 +672,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
- client_refs = ClientRefs1,
+ clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- client_ondisk_callback = dict:new(),
cref_to_guids = dict:new()
},
@@ -684,6 +694,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
index_module = IndexModule,
index_state = IndexState,
file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
msg_store = self()
}),
@@ -694,10 +705,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC} -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -713,7 +724,7 @@ prioritise_cast(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, Callback}, _From,
+handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -721,21 +732,15 @@ handle_call({new_client_state, CRef, Callback}, _From,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- client_ondisk_callback = CODC,
+ clients = Clients,
gc_pid = GCPid }) ->
- CODC1 = case Callback of
- undefined -> CODC;
- _ -> dict:store(CRef, Callback, CODC)
- end,
+ Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
- client_ondisk_callback = CODC1 });
+ State #msstate { clients = Clients1 });
-handle_call({client_terminate, CRef}, _From,
- State) ->
- reply(ok, clear_client_callback(CRef, State));
+handle_call({client_terminate, CRef}, _From, State) ->
+ reply(ok, clear_client(CRef, State));
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
@@ -750,36 +755,26 @@ handle_cast({client_dying, CRef},
DyingClients1 = sets:add_element(CRef, DyingClients),
write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
-handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs,
- dying_clients = DyingClients }) ->
- State1 = clear_client_callback(
- CRef, State #msstate {
- client_refs = sets:del_element(CRef, ClientRefs),
- dying_clients = sets:del_element(CRef, DyingClients) }),
- noreply(remove_message(CRef, CRef, State1));
+handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
+ State1 = State #msstate { clients = dict:erase(CRef, Clients) },
+ noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
-
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
- State1 = State #msstate { cref_to_guids = CTG1 },
case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
noreply(State);
{false, not_found} ->
- write_message(Guid, Msg, State1);
+ write_message(CRef, Guid, Msg, State);
{Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }} ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State1),
- write_message(Guid, Msg, State1);
+ ok = index_delete(Guid, State),
+ write_message(CRef, Guid, Msg, State);
{false_if_increment, [#file_summary { locked = true }]} ->
%% The msg for Guid is older than the client death
%% message, but as it is being GC'd currently,
@@ -788,8 +783,8 @@ handle_cast({write, CRef, Guid},
noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State2))
+ State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State1))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
@@ -832,10 +827,11 @@ handle_cast(sync, State) ->
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ file_summary_ets = FileSummaryEts,
+ clients = Clients }) ->
ok = cleanup_after_file_deletion(Source, State),
- %% see comment in cleanup_after_file_deletion
- true = mark_handle_to_close(FileHandlesEts, Destination),
+ %% see comment in cleanup_after_file_deletion, and client_read3
+ true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false),
true = ets:update_element(FileSummaryEts, Destination,
{#file_summary.locked, false}),
State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
@@ -865,7 +861,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
+ clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
@@ -881,7 +877,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
[ets:delete(T) ||
T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, sets:to_list(ClientRefs)},
+ store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -943,6 +939,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_message(CRef, Guid, Msg, State) ->
+ write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
+
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
@@ -1135,46 +1134,44 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids, ActionTaken,
- State = #msstate { client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
- case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids, ActionTaken),
- CTG1 = case dict:find(CRef, CTG) of
- {ok, Gs} ->
- Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
- end;
- error -> CTG
- end,
- State #msstate { cref_to_guids = CTG1 };
- error -> State
- end.
-
-add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
- case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef,
- fun (Guids) -> gb_sets:add(Guid, Guids) end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
+update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients,
+ cref_to_guids = CTG }) ->
+ case dict:fetch(CRef, Clients) of
+ {undefined, _CloseFDsFun} -> State;
+ {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG),
+ State #msstate { cref_to_guids = CTG1 }
end.
-client_confirm_if_on_disk(CRef, Guid, File,
- State = #msstate { client_ondisk_callback = CODC,
- current_file = CurFile,
- cref_to_guids = CTG }) ->
- CTG1 =
- case File of
- CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
- _ -> case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
- _ -> ok
- end,
- CTG
- end,
- State #msstate { cref_to_guids = CTG1 }.
+record_pending_confirm(CRef, Guid, State) ->
+ update_pending_confirms(
+ fun (_MsgOnDiskFun, CTG) ->
+ dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG)
+ end, CRef, State).
+
+client_confirm_if_on_disk(CRef, Guid, CurFile,
+ State = #msstate { current_file = CurFile }) ->
+ record_pending_confirm(CRef, Guid, State);
+client_confirm_if_on_disk(CRef, Guid, _File, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State).
+
+client_confirm(CRef, Guids, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(Guids, ActionTaken),
+ case dict:find(CRef, CTG) of
+ {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end
+ end, CRef, State).
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
@@ -1221,29 +1218,54 @@ close_handle(Key, FHC) ->
error -> FHC
end.
-mark_handle_open(FileHandlesEts, File) ->
- %% This is fine to fail (already exists)
- ets:insert_new(FileHandlesEts, {{self(), File}, open}),
+mark_handle_open(FileHandlesEts, File, Ref) ->
+ %% This is fine to fail (already exists). Note it could fail with
+ %% the value being close, and not have it updated to open.
+ ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
true.
-mark_handle_to_close(FileHandlesEts, File) ->
- [ ets:update_element(FileHandlesEts, Key, {2, close})
- || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
+%% See comment in client_read3 - only call this when the file is locked
+mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
+ [ begin
+ case (ets:update_element(FileHandlesEts, Key, {2, close})
+ andalso Invoke) of
+ true -> case dict:fetch(Ref, ClientRefs) of
+ {_MsgOnDiskFun, undefined} -> ok;
+ {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ end;
+ false -> ok
+ end
+ end || {{Ref, _File} = Key, open} <-
+ ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
true.
-close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
+safe_file_delete_fun(File, Dir, FileHandlesEts) ->
+ fun () -> safe_file_delete(File, Dir, FileHandlesEts) end.
+
+safe_file_delete(File, Dir, FileHandlesEts) ->
+ %% do not match on any value - it's the absence of the row that
+ %% indicates the client has really closed the file.
+ case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of
+ {[_|_], _Cont} -> false;
+ _ -> ok = file:delete(
+ form_filename(Dir, filenum_to_name(File))),
+ true
+ end.
+
+close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
+ client_ref = Ref } =
CState) ->
- Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
- lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
- true = ets:delete(FileHandlesEts, Key),
- close_handle(File, CStateM)
- end, CState, Objs).
-
-close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
- file_handle_cache = FHC }) ->
- Self = self(),
+ Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
+ {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
+ true = ets:delete(FileHandlesEts, Key),
+ close_handle(File, CStateM)
+ end, CState, Objs)}.
+
+close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
+ file_handle_cache = FHC,
+ client_ref = Ref }) ->
ok = dict:fold(fun (File, Hdl, ok) ->
- true = ets:delete(FileHandlesEts, {Self, File}),
+ true = ets:delete(FileHandlesEts, {Ref, File}),
file_handle_cache:close(Hdl)
end, ok, FHC),
CState #client_msstate { file_handle_cache = dict:new() };
@@ -1381,16 +1403,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%%----------------------------------------------------------------------------
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
- {false, IndexModule:new(Dir), sets:new()};
+ {false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
- {false, IndexModule:new(Dir), sets:new()};
+ {false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
[Server | ErrorArgs]),
- {false, IndexModule:new(Dir), sets:new()}
+ {false, IndexModule:new(Dir), []}
end,
case read_recovery_terms(Dir) of
{false, Error} ->
@@ -1402,8 +1424,7 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
- {true, IndexState1,
- sets:from_list(ClientRefs)};
+ {true, IndexState1, ClientRefs};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
@@ -1744,7 +1765,8 @@ delete_file_if_empty(File, State = #msstate {
cleanup_after_file_deletion(File,
#msstate { file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ file_summary_ets = FileSummaryEts,
+ clients = Clients }) ->
%% Ensure that any clients that have open fhs to the file close
%% them before using them again. This has to be done here (given
%% it's done in the msg_store, and not the gc), and not when
@@ -1752,7 +1774,7 @@ cleanup_after_file_deletion(File,
%% the client could find the close, and close and reopen the fh,
%% whilst the GC is waiting for readers to disappear, before it's
%% actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, File),
+ true = mark_handle_to_close(Clients, FileHandlesEts, File, true),
[#file_summary { left = Left,
right = Right,
locked = true,
@@ -1781,6 +1803,7 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
combine_files(Source, Destination,
State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
@@ -1841,7 +1864,7 @@ combine_files(Source, Destination,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
ok = file_handle_cache:close(DestinationHdl),
- ok = file_handle_cache:delete(SourceHdl),
+ ok = file_handle_cache:close(SourceHdl),
%% don't update dest.right, because it could be changing at the
%% same time
@@ -1851,9 +1874,11 @@ combine_files(Source, Destination,
{#file_summary.file_size, TotalValidData}]),
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
- gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}).
+ gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
+ safe_file_delete_fun(Source, Dir, FileHandlesEts).
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
[#file_summary { valid_total_size = 0,
@@ -1861,8 +1886,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_size = FileSize,
readers = 0 }] = ets:lookup(FileSummaryEts, File),
{[], 0} = load_and_vacuum_message_file(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- gen_server2:cast(Server, {delete_file, File, FileSize}).
+ gen_server2:cast(Server, {delete_file, File, FileSize}),
+ safe_file_delete_fun(File, Dir, FileHandlesEts).
load_and_vacuum_message_file(File, #gc_state { dir = Dir,
index_module = Index,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index cd9fd497..428f8b10 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -42,6 +42,7 @@
-record(state,
{ pending_no_readers,
+ on_action,
msg_store_state
}).
@@ -89,6 +90,7 @@ init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #state { pending_no_readers = dict:new(),
+ on_action = [],
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -131,11 +133,15 @@ code_change(_OldVsn, State, _Extra) ->
attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
+ on_action = Thunks,
msg_store_state = MsgStoreState }) ->
case [File || File <- Files,
rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> do_action(Action, Files, MsgStoreState),
- State;
+ [] -> State #state {
+ on_action = lists:filter(
+ fun (Thunk) -> not Thunk() end,
+ [do_action(Action, Files, MsgStoreState) |
+ Thunks]) };
[File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
State #state { pending_no_readers = Pending1 }
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1709ef3c..91920f9b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1443,6 +1443,7 @@ test_backing_queue() ->
passed = test_queue_index(),
passed = test_queue_index_props(),
passed = test_variable_queue(),
+ passed = test_variable_queue_delete_msg_store_files_callback(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
MaxJournal, infinity),
@@ -1459,6 +1460,9 @@ restart_msg_store_empty() ->
guid_bin(X) ->
erlang:md5(term_to_binary(X)).
+msg_store_client_init(MsgStore, Ref) ->
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+
msg_store_contains(Atom, Guids, MSCState) ->
Atom = lists:foldl(
fun (Guid, Atom1) when Atom1 =:= Atom ->
@@ -1502,12 +1506,12 @@ msg_store_remove(MsgStore, Ref, Guids) ->
with_msg_store_client(MsgStore, Ref, Fun) ->
rabbit_msg_store:client_terminate(
- Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))).
+ Fun(msg_store_client_init(MsgStore, Ref))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)).
+ msg_store_client_init(MsgStore, Ref), L)).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1515,8 +1519,7 @@ test_msg_store() ->
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
Ref = rabbit_guid:guid(),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
@@ -1582,8 +1585,7 @@ test_msg_store() ->
([Guid|GuidsTail]) ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
- MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
@@ -1592,8 +1594,7 @@ test_msg_store() ->
ok = rabbit_msg_store:client_terminate(MSCState5),
%% restart empty
restart_msg_store_empty(),
- MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids, MSCState6),
%% publish the first half again
@@ -1601,8 +1602,7 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
msg_store_read(Guids1stHalf, MSCState6)),
- MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
@@ -1660,8 +1660,7 @@ init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
- PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef, undefined),
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
fun (Guid) ->
@@ -1695,7 +1694,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
+ MSCState = msg_store_client_init(MsgStore, Ref),
{A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
@@ -2123,6 +2122,35 @@ test_queue_recover() ->
end),
passed.
+test_variable_queue_delete_msg_store_files_callback() ->
+ ok = restart_msg_store_empty(),
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
+ rabbit_amqqueue:declare(test_queue(), true, false, [], none),
+ TxID = rabbit_guid:guid(),
+ Payload = <<0:8388608>>, %% 1MB
+ Count = 30,
+ [begin
+ Msg = rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2}, Payload),
+ Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
+ sender = self(), message = Msg},
+ true = rabbit_amqqueue:deliver(QPid, Delivery)
+ end || _ <- lists:seq(1, Count)],
+ rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ rabbit_amqqueue:set_ram_duration_target(QPid, 0),
+
+ CountMinusOne = Count - 1,
+ {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
+ rabbit_amqqueue:basic_get(Q, self(), true),
+ {ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
+
+ %% give the queue a second to receive the close_fds callback msg
+ timer:sleep(1000),
+
+ rabbit_amqqueue:delete(Q, false, false),
+ passed.
+
test_configurable_server_properties() ->
%% List of the names of the built-in properties do we expect to find
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 35e37df6..64f57962 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -436,10 +436,10 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms};
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
- PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef, MsgOnDiskFun),
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
- TRef, undefined),
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun),
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
+ undefined),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -937,7 +937,12 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
+ rabbit_msg_store:client_init(
+ MsgStore, Ref, MsgOnDiskFun,
+ msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -964,6 +969,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+msg_store_close_fds(MSCState, IsPersistent) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
+
+msg_store_close_fds_fun(IsPersistent) ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ Self,
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} =
+ msg_store_close_fds(MSCState, IsPersistent),
+ {[], State #vqstate { msg_store_clients = MSCState1 }}
+ end)
+ end.
+
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
maybe_write_delivered(true, SeqId, IndexState) ->