diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-19 10:42:25 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-19 10:42:25 +0000 |
commit | c9dc04c1ac3e2eb78c2c6bd4e83c7fb4f60950b5 (patch) | |
tree | aa7ee399fe602794dcb444d8796c992284058c03 /src | |
parent | cfa09e5fd38baa616b01c002e7e9e34bdc437b56 (diff) | |
parent | 7f6191345de6b256b1fce92626b4df43fc6bad7e (diff) | |
download | rabbitmq-server-c9dc04c1ac3e2eb78c2c6bd4e83c7fb4f60950b5.tar.gz |
merge from default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 161 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 289 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 54 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
9 files changed, 340 insertions, 246 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9626e126..7ba5a17f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, maybe_run_queue_via_backing_queue_async/2, - update_ram_duration/1, set_ram_duration_target/2, + sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, @@ -69,6 +69,8 @@ -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). +-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). + -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: @@ -147,7 +149,7 @@ -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> rabbit_types:amqqueue() | 'not_found'). + -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). @@ -155,6 +157,7 @@ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). +-spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -463,6 +466,9 @@ maybe_run_queue_via_backing_queue(QPid, Fun) -> maybe_run_queue_via_backing_queue_async(QPid, Fun) -> gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +sync_timeout(QPid) -> + gen_server2:cast(QPid, sync_timeout). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 38b83117..fa425fa8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -224,11 +224,9 @@ next_state(State) -> false -> {stop_sync_timer(State2), hibernate} end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> +ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, - rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -797,6 +795,7 @@ prioritise_cast(Msg, _State) -> {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {maybe_run_queue_via_backing_queue, _Fun} -> 6; + sync_timeout -> 6; _ -> 0 end. @@ -1017,6 +1016,11 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast(sync_timeout, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), + sync_timer_ref = undefined}); + handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 74fd00b7..9742c4b6 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -59,17 +59,18 @@ rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-type(add_res() :: bind_res() | rabbit_misc:const(bind_res())). +-type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')). +-type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())). -opaque(deletions() :: dict()). -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). --spec(add/1 :: (rabbit_types:binding()) -> bind_res()). --spec(remove/1 :: (rabbit_types:binding()) -> - bind_res() | rabbit_types:error('binding_not_found')). --spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). --spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> - bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/1 :: (rabbit_types:binding()) -> add_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 78ecb774..ca424335 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed}). + confirm_enabled, publish_seqno, unconfirmed, confirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -188,7 +188,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty()}, + unconfirmed = gb_trees:empty(), + confirmed = []}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -204,8 +205,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; - _ -> 0 + emit_stats -> 7; + {confirm, _MsgSeqNos, _QPid} -> 5; + _ -> 0 end. handle_call(flush, _From, State) -> @@ -280,30 +282,27 @@ handle_cast({deliver, ConsumerTag, AckRequired, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), - {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, - hibernate}; + noreply([ensure_stats_timer], + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); -handle_cast({confirm, MsgSeqNos, From}, - State= #ch{stats_timer = StatsTimer}) -> - case rabbit_event:stats_level(StatsTimer) of - fine -> - {noreply, group_and_confirm(MsgSeqNos, From, State), hibernate}; - _ -> - {noreply, nogroup_confirm(MsgSeqNos, From, State), hibernate} - end. +handle_cast({confirm, MsgSeqNos, From}, State) -> + State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), + noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). + +handle_info(timeout, State) -> + noreply(State); handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {EMs, UC1} = remove_queue_unconfirmed( + {MEs, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}), - State1 = confirm_grouped(EMs, State#ch{unconfirmed = UC1}), + {[], UC}, State), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State1), hibernate}. + noreply( + queue_blocked(QPid, record_confirms(MEs, State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -333,11 +332,24 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> - {reply, Reply, ensure_stats_timer(NewState), hibernate}. +reply(Reply, NewState) -> reply(Reply, [], NewState). + +reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate). + +reply(Reply, Mask, NewState, Timeout) -> + {reply, Reply, next_state(Mask, NewState), Timeout}. + +noreply(NewState) -> noreply([], NewState). + +noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate). + +noreply(Mask, NewState, Timeout) -> + {noreply, next_state(Mask, NewState), Timeout}. -noreply(NewState) -> - {noreply, ensure_stats_timer(NewState), hibernate}. +next_state(Mask, State) -> + lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1); + (send_confirms, State1) -> send_confirms(State1) + end, State, [ensure_stats_timer, send_confirms] -- Mask). ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), @@ -476,61 +488,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QX, Acc) -> +remove_queue_unconfirmed(none, _QX, Acc, _State) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) -> +remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, QX, Acc)). + remove_qmsg(MsgSeqNo, QPid, QX, Acc, State), + State). -group_and_confirm([], _QPid, State) -> +record_confirm(undefined, _, State) -> State; -group_and_confirm(MsgSeqNos, QPid, State) -> - {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State), - confirm_grouped(EMs, State#ch{unconfirmed=UC1}). - -confirm_grouped(EMs, State) -> - case lists:usort(EMs) of - [{XName, MsgSeqNo} | EMs1] -> - lists:foldl( - fun({XName1, MsgSeqNosE}, State0) -> - send_confirms(MsgSeqNosE, XName1, State0) - end, State, - group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); - [] -> - State - end. - -group_confirms_by_exchange([], Acc) -> - Acc; -group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) -> - group_confirms_by_exchange(EMs, [{E, [Msg1 | Msgs]} | Acc]); -group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> - group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]). +record_confirm(MsgSeqNo, XName, State) -> + record_confirms([{MsgSeqNo, XName}], State). -nogroup_confirm([], _QPid, State) -> +record_confirms([], State) -> State; -nogroup_confirm(MsgSeqNos, QPid, State) -> - {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State), - DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs], - send_confirms(DoneMessages, undefined, State#ch{unconfirmed = UC1}). +record_confirms(MEs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MEs | C]}. -take_from_unconfirmed(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, {_, XName} = QX} -> - maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - remove_qmsg(MsgSeqNo, QPid, QX, Acc) - end - end, {[], UC}, MsgSeqNos). - -remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) -> - %% remove QPid from MsgSeqNo's mapping +confirm([], _QPid, State) -> + State; +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MEs, UC1} = + lists:foldl( + fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> Acc; + {value,QX} -> remove_qmsg(MsgSeqNo, QPid, QX, Acc, State) + end + end, {[], UC}, MsgSeqNos), + record_confirms(MEs, State#ch{unconfirmed = UC1}). + +remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MEs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of - 0 -> {[{XName, MsgSeqNo} | XMs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {XMs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} + 0 -> {[{MsgSeqNo, XName} | MEs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MEs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1252,12 +1245,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State#ch.writer_pid, no_route), - send_confirms([MsgSeqNo], XName, State); + record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State#ch.writer_pid, no_consumers), - send_confirms([MsgSeqNo], XName, State); + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> - send_confirms([MsgSeqNo], XName, State); + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> @@ -1271,15 +1264,19 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -send_confirms([], _, State) -> +send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> + MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of + fine -> incr_confirm_exchange_stats(C, State); + _ -> [MsgSeqNo || {MsgSeqNo, _} <- C] + end, + send_confirms(MsgSeqNos, State #ch{confirmed = []}). +send_confirms([], State) -> State; -send_confirms([MsgSeqNo], XName, +send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), - maybe_incr_stats([{XName, 1}], confirm, State), State; -send_confirms(Cs, XName, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; @@ -1293,11 +1290,15 @@ send_confirms(Cs, XName, multiple = true}) end, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], - maybe_incr_stats([{XName, length(Cs)}], confirm, State), State. -send_confirm(undefined, _WriterPid) -> - ok; +incr_confirm_exchange_stats(C, State) -> + lists:foldl( + fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State), + [MsgSeqNo | MsgSeqNos0] + end, [], lists:append(C)). + send_confirm(SeqNo, WriterPid) -> ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = SeqNo}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9e8ba91b..14f03a77 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -75,10 +75,11 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, const/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). +-type(const(T) :: fun((any()) -> T)). -type(resource_name() :: binary()). -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() @@ -204,7 +205,7 @@ -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/1 :: (any()) -> 'ok'). --spec(const/1 :: (A) -> fun ((_) -> A)). +-spec(const/1 :: (A) -> const(A)). -endif. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f8b41ed3..80e319dd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,8 +34,8 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/3, client_terminate/1, client_delete_and_terminate/1, - client_ref/1, + client_init/4, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, close_all_indicated/1, write/3, read/2, contains/2, remove/2, release/2, sync/3]). -export([sync/1, set_maximum_since_use/2, @@ -82,10 +82,10 @@ dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table dying_clients, %% set of dying clients - client_refs, %% set of references of all registered clients + clients, %% map of references of all registered clients + %% to callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - client_ondisk_callback, %% client ref to callback function mapping cref_to_guids %% client ref to synced messages mapping }). @@ -111,6 +111,7 @@ index_module, index_state, file_summary_ets, + file_handles_ets, msg_store }). @@ -124,6 +125,7 @@ index_module :: atom(), index_state :: any(), file_summary_ets :: ets:tid(), + file_handles_ets :: ets:tid(), msg_store :: server() }). @@ -146,13 +148,15 @@ {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). -type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). +-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). +-type(deletion_thunk() :: fun (() -> boolean())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) -> - client_msstate()). +-spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(), + maybe_close_fds_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). @@ -169,8 +173,8 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> - 'ok'). --spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok'). + deletion_thunk()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). -endif. @@ -380,9 +384,9 @@ %% %% We use a separate set to keep track of the dying clients in order %% to keep that set, which is inspected on every write and remove, as -%% small as possible. Inspecting client_refs - the set of all clients -%% - would degrade performance with many healthy clients and few, if -%% any, dying clients, which is the typical case. +%% small as possible. Inspecting the set of all clients would degrade +%% performance with many healthy clients and few, if any, dying +%% clients, which is the typical case. %% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -399,11 +403,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref, MsgOnDiskFun) -> +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, - infinity), + gen_server2:call( + Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -523,7 +527,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - gc_pid = GCPid }) -> + gc_pid = GCPid, + client_ref = Ref }) -> Release = fun() -> ok = case ets:update_counter(FileSummaryEts, File, {#file_summary.readers, -1}) of @@ -570,9 +575,14 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, case index_lookup(Guid, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. - mark_handle_open(FileHandlesEts, File), - - CState1 = close_all_indicated(CState), + {ok, CState1} = close_all_indicated(CState), + %% We are now guaranteed that the mark_handle_open + %% call will either insert_new correctly, or will + %% fail, but find the value is open, not close. + mark_handle_open(FileHandlesEts, File, Ref), + %% Could the msg_store now mark the file to be + %% closed? No: marks for closing are issued only + %% when the msg_store has locked the file. {Msg, CState2} = %% This will never be the current file read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg @@ -588,11 +598,10 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. -clear_client_callback(CRef, - State = #msstate { client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> - State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), - cref_to_guids = dict:erase(CRef, CTG)}. +clear_client(CRef, State = #msstate { cref_to_guids = CTG, + dying_clients = DyingClients }) -> + State #msstate { cref_to_guids = dict:erase(CRef, CTG), + dying_clients = sets:del_element(CRef, DyingClients) }. %%---------------------------------------------------------------------------- @@ -628,6 +637,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), + Clients = dict:from_list( + [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both %% recovered correctly. true = case {FileSummaryRecovered, CleanShutdown} of @@ -661,10 +672,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, dying_clients = sets:new(), - client_refs = ClientRefs1, + clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - client_ondisk_callback = dict:new(), cref_to_guids = dict:new() }, @@ -684,6 +694,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> index_module = IndexModule, index_state = IndexState, file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, msg_store = self() }), @@ -694,10 +705,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -713,7 +724,7 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, Callback}, _From, +handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -721,21 +732,15 @@ handle_call({new_client_state, CRef, Callback}, _From, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - client_ondisk_callback = CODC, + clients = Clients, gc_pid = GCPid }) -> - CODC1 = case Callback of - undefined -> CODC; - _ -> dict:store(CRef, Callback, CODC) - end, + Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs), - client_ondisk_callback = CODC1 }); + State #msstate { clients = Clients1 }); -handle_call({client_terminate, CRef}, _From, - State) -> - reply(ok, clear_client_callback(CRef, State)); +handle_call({client_terminate, CRef}, _From, State) -> + reply(ok, clear_client(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -750,36 +755,26 @@ handle_cast({client_dying, CRef}, DyingClients1 = sets:add_element(CRef, DyingClients), write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); -handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs, - dying_clients = DyingClients }) -> - State1 = clear_client_callback( - CRef, State #msstate { - client_refs = sets:del_element(CRef, ClientRefs), - dying_clients = sets:del_element(CRef, DyingClients) }), - noreply(remove_message(CRef, CRef, State1)); +handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> + State1 = State #msstate { clients = dict:erase(CRef, Clients) }, + noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts, - client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> - + State = #msstate { file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC), - State1 = State #msstate { cref_to_guids = CTG1 }, case should_mask_action(CRef, Guid, State) of {true, _Location} -> noreply(State); {false, not_found} -> - write_message(Guid, Msg, State1); + write_message(CRef, Guid, Msg, State); {Mask, #msg_location { ref_count = 0, file = File, total_size = TotalSize }} -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State1), - write_message(Guid, Msg, State1); + ok = index_delete(Guid, State), + write_message(CRef, Guid, Msg, State); {false_if_increment, [#file_summary { locked = true }]} -> %% The msg for Guid is older than the client death %% message, but as it is being GC'd currently, @@ -788,8 +783,8 @@ handle_cast({write, CRef, Guid}, noreply(State); {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), - State2 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State2)) + State1 = client_confirm_if_on_disk(CRef, Guid, File, State), + noreply(adjust_valid_total_size(File, TotalSize, State1)) end; {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only @@ -832,10 +827,11 @@ handle_cast(sync, State) -> handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + file_summary_ets = FileSummaryEts, + clients = Clients }) -> ok = cleanup_after_file_deletion(Source, State), - %% see comment in cleanup_after_file_deletion - true = mark_handle_to_close(FileHandlesEts, Destination), + %% see comment in cleanup_after_file_deletion, and client_read3 + true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false), true = ets:update_element(FileSummaryEts, Destination, {#file_summary.locked, false}), State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, @@ -865,7 +861,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, + clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull %% out the ets tables from under it. @@ -881,7 +877,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, sets:to_list(ClientRefs)}, + store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -943,6 +939,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_message(CRef, Guid, Msg, State) -> + write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, @@ -1135,46 +1134,44 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, ActionTaken, - State = #msstate { client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> - case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids, ActionTaken), - CTG1 = case dict:find(CRef, CTG) of - {ok, Gs} -> - Guids1 = gb_sets:difference(Gs, Guids), - case gb_sets:is_empty(Guids1) of - true -> dict:erase(CRef, CTG); - false -> dict:store(CRef, Guids1, CTG) - end; - error -> CTG - end, - State #msstate { cref_to_guids = CTG1 }; - error -> State - end. - -add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) -> - case dict:find(CRef, CODC) of - {ok, _} -> dict:update(CRef, - fun (Guids) -> gb_sets:add(Guid, Guids) end, - gb_sets:singleton(Guid), CTG); - error -> CTG +update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, + cref_to_guids = CTG }) -> + case dict:fetch(CRef, Clients) of + {undefined, _CloseFDsFun} -> State; + {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG), + State #msstate { cref_to_guids = CTG1 } end. -client_confirm_if_on_disk(CRef, Guid, File, - State = #msstate { client_ondisk_callback = CODC, - current_file = CurFile, - cref_to_guids = CTG }) -> - CTG1 = - case File of - CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC); - _ -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(gb_sets:singleton(Guid), written); - _ -> ok - end, - CTG - end, - State #msstate { cref_to_guids = CTG1 }. +record_pending_confirm(CRef, Guid, State) -> + update_pending_confirms( + fun (_MsgOnDiskFun, CTG) -> + dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end, + gb_sets:singleton(Guid), CTG) + end, CRef, State). + +client_confirm_if_on_disk(CRef, Guid, CurFile, + State = #msstate { current_file = CurFile }) -> + record_pending_confirm(CRef, Guid, State); +client_confirm_if_on_disk(CRef, Guid, _File, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State). + +client_confirm(CRef, Guids, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(Guids, ActionTaken), + case dict:find(CRef, CTG) of + {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end + end, CRef, State). %% Detect whether the Guid is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death @@ -1221,29 +1218,54 @@ close_handle(Key, FHC) -> error -> FHC end. -mark_handle_open(FileHandlesEts, File) -> - %% This is fine to fail (already exists) - ets:insert_new(FileHandlesEts, {{self(), File}, open}), +mark_handle_open(FileHandlesEts, File, Ref) -> + %% This is fine to fail (already exists). Note it could fail with + %% the value being close, and not have it updated to open. + ets:insert_new(FileHandlesEts, {{Ref, File}, open}), true. -mark_handle_to_close(FileHandlesEts, File) -> - [ ets:update_element(FileHandlesEts, Key, {2, close}) - || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], +%% See comment in client_read3 - only call this when the file is locked +mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> + [ begin + case (ets:update_element(FileHandlesEts, Key, {2, close}) + andalso Invoke) of + true -> case dict:fetch(Ref, ClientRefs) of + {_MsgOnDiskFun, undefined} -> ok; + {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() + end; + false -> ok + end + end || {{Ref, _File} = Key, open} <- + ets:match_object(FileHandlesEts, {{'_', File}, open}) ], true. -close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = +safe_file_delete_fun(File, Dir, FileHandlesEts) -> + fun () -> safe_file_delete(File, Dir, FileHandlesEts) end. + +safe_file_delete(File, Dir, FileHandlesEts) -> + %% do not match on any value - it's the absence of the row that + %% indicates the client has really closed the file. + case ets:match_object(FileHandlesEts, {{'_', File}, '_'}, 1) of + {[_|_], _Cont} -> false; + _ -> ok = file:delete( + form_filename(Dir, filenum_to_name(File))), + true + end. + +close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, + client_ref = Ref } = CState) -> - Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), - lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> - true = ets:delete(FileHandlesEts, Key), - close_handle(File, CStateM) - end, CState, Objs). - -close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, - file_handle_cache = FHC }) -> - Self = self(), + Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}), + {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) -> + true = ets:delete(FileHandlesEts, Key), + close_handle(File, CStateM) + end, CState, Objs)}. + +close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_handle_cache = FHC, + client_ref = Ref }) -> ok = dict:fold(fun (File, Hdl, ok) -> - true = ets:delete(FileHandlesEts, {Self, File}), + true = ets:delete(FileHandlesEts, {Ref, File}), file_handle_cache:close(Hdl) end, ok, FHC), CState #client_msstate { file_handle_cache = dict:new() }; @@ -1381,16 +1403,16 @@ index_delete_by_file(File, #msstate { index_module = Index, %%---------------------------------------------------------------------------- recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> - {false, IndexModule:new(Dir), sets:new()}; + {false, IndexModule:new(Dir), []}; recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), - {false, IndexModule:new(Dir), sets:new()}; + {false, IndexModule:new(Dir), []}; recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> Fresh = fun (ErrorMsg, ErrorArgs) -> rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" "rebuilding indices from scratch~n", [Server | ErrorArgs]), - {false, IndexModule:new(Dir), sets:new()} + {false, IndexModule:new(Dir), []} end, case read_recovery_terms(Dir) of {false, Error} -> @@ -1402,8 +1424,7 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> andalso IndexModule =:= RecIndexModule) of true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> - {true, IndexState1, - sets:from_list(ClientRefs)}; + {true, IndexState1, ClientRefs}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; @@ -1744,7 +1765,8 @@ delete_file_if_empty(File, State = #msstate { cleanup_after_file_deletion(File, #msstate { file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + file_summary_ets = FileSummaryEts, + clients = Clients }) -> %% Ensure that any clients that have open fhs to the file close %% them before using them again. This has to be done here (given %% it's done in the msg_store, and not the gc), and not when @@ -1752,7 +1774,7 @@ cleanup_after_file_deletion(File, %% the client could find the close, and close and reopen the fh, %% whilst the GC is waiting for readers to disappear, before it's %% actually done the GC. - true = mark_handle_to_close(FileHandlesEts, File), + true = mark_handle_to_close(Clients, FileHandlesEts, File, true), [#file_summary { left = Left, right = Right, locked = true, @@ -1781,6 +1803,7 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> combine_files(Source, Destination, State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> [#file_summary { @@ -1841,7 +1864,7 @@ combine_files(Source, Destination, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), - ok = file_handle_cache:delete(SourceHdl), + ok = file_handle_cache:close(SourceHdl), %% don't update dest.right, because it could be changing at the %% same time @@ -1851,9 +1874,11 @@ combine_files(Source, Destination, {#file_summary.file_size, TotalValidData}]), Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, - gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), + safe_file_delete_fun(Source, Dir, FileHandlesEts). delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> [#file_summary { valid_total_size = 0, @@ -1861,8 +1886,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_size = FileSize, readers = 0 }] = ets:lookup(FileSummaryEts, File), {[], 0} = load_and_vacuum_message_file(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - gen_server2:cast(Server, {delete_file, File, FileSize}). + gen_server2:cast(Server, {delete_file, File, FileSize}), + safe_file_delete_fun(File, Dir, FileHandlesEts). load_and_vacuum_message_file(File, #gc_state { dir = Dir, index_module = Index, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index cd9fd497..428f8b10 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -42,6 +42,7 @@ -record(state, { pending_no_readers, + on_action, msg_store_state }). @@ -89,6 +90,7 @@ init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), {ok, #state { pending_no_readers = dict:new(), + on_action = [], msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -131,11 +133,15 @@ code_change(_OldVsn, State, _Extra) -> attempt_action(Action, Files, State = #state { pending_no_readers = Pending, + on_action = Thunks, msg_store_state = MsgStoreState }) -> case [File || File <- Files, rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> do_action(Action, Files, MsgStoreState), - State; + [] -> State #state { + on_action = lists:filter( + fun (Thunk) -> not Thunk() end, + [do_action(Action, Files, MsgStoreState) | + Thunks]) }; [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), State #state { pending_no_readers = Pending1 } end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1709ef3c..91920f9b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1443,6 +1443,7 @@ test_backing_queue() -> passed = test_queue_index(), passed = test_queue_index_props(), passed = test_variable_queue(), + passed = test_variable_queue_delete_msg_store_files_callback(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, MaxJournal, infinity), @@ -1459,6 +1460,9 @@ restart_msg_store_empty() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). +msg_store_client_init(MsgStore, Ref) -> + rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). + msg_store_contains(Atom, Guids, MSCState) -> Atom = lists:foldl( fun (Guid, Atom1) when Atom1 =:= Atom -> @@ -1502,12 +1506,12 @@ msg_store_remove(MsgStore, Ref, Guids) -> with_msg_store_client(MsgStore, Ref, Fun) -> rabbit_msg_store:client_terminate( - Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))). + Fun(msg_store_client_init(MsgStore, Ref))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)). + msg_store_client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), @@ -1515,8 +1519,7 @@ test_msg_store() -> Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids, MSCState), %% publish the first half @@ -1582,8 +1585,7 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), - MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> @@ -1592,8 +1594,7 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), - MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again @@ -1601,8 +1602,7 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( msg_store_read(Guids1stHalf, MSCState6)), - MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty @@ -1660,8 +1660,7 @@ init_test_queue() -> TestQueue = test_queue(), Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), - PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef, undefined), + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( TestQueue, Terms, false, fun (Guid) -> @@ -1695,7 +1694,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), + MSCState = msg_store_client_init(MsgStore, Ref), {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> @@ -2123,6 +2122,35 @@ test_queue_recover() -> end), passed. +test_variable_queue_delete_msg_store_files_callback() -> + ok = restart_msg_store_empty(), + {new, #amqqueue { pid = QPid, name = QName } = Q} = + rabbit_amqqueue:declare(test_queue(), true, false, [], none), + TxID = rabbit_guid:guid(), + Payload = <<0:8388608>>, %% 1MB + Count = 30, + [begin + Msg = rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, Payload), + Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, + sender = self(), message = Msg}, + true = rabbit_amqqueue:deliver(QPid, Delivery) + end || _ <- lists:seq(1, Count)], + rabbit_amqqueue:commit_all([QPid], TxID, self()), + rabbit_amqqueue:set_ram_duration_target(QPid, 0), + + CountMinusOne = Count - 1, + {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = + rabbit_amqqueue:basic_get(Q, self(), true), + {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), + + %% give the queue a second to receive the close_fds callback msg + timer:sleep(1000), + + rabbit_amqqueue:delete(Q, false, false), + passed. + test_configurable_server_properties() -> %% List of the names of the built-in properties do we expect to find BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 35e37df6..64f57962 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -436,10 +436,10 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef, MsgOnDiskFun), - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, - TRef, undefined), + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun), + TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef, + undefined), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -937,7 +937,12 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun) -> - rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). + msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). + +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> + rabbit_msg_store:client_init( + MsgStore, Ref, MsgOnDiskFun, + msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -964,6 +969,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> MSCState, IsPersistent, fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). +msg_store_close_fds(MSCState, IsPersistent) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). + +msg_store_close_fds_fun(IsPersistent) -> + Self = self(), + fun () -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + Self, + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {ok, MSCState1} = + msg_store_close_fds(MSCState, IsPersistent), + {[], State #vqstate { msg_store_clients = MSCState1 }} + end) + end. + maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; maybe_write_delivered(true, SeqId, IndexState) -> |