diff options
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 34 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 108 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 20 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 84 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 66 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 214 |
8 files changed, 308 insertions, 243 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index d41fcb17..b85e4ad6 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,10 +25,13 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/2 :: (rabbit_types:amqqueue(), attempt_recovery()) -> state()). +-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), + async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). @@ -41,6 +44,7 @@ (false, rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). +-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). @@ -66,7 +70,6 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). --spec(invoke/3 :: (atom(), fun ((A) -> A), state()) -> - {[rabbit_guid:guid()], state()}). +-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). -spec(validate_message/2 :: (rabbit_types:basic_message(), state()) -> {'invalid' | 'valid', state()}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 36b1662e..9820567c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,17 +17,11 @@ -module(rabbit_amqqueue). -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, - maybe_run_queue_via_backing_queue/3, - maybe_run_queue_via_backing_queue_async/3, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -35,6 +29,14 @@ -export([on_node_down/1]). -export([store_queue/1]). + +%% internal +-export([internal_declare/2, internal_delete/1, + run_backing_queue/3, run_backing_queue_async/3, + sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, + set_maximum_since_use/2, maybe_expire/1, drop_expired/1, + emit_stats/1]). + -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -141,10 +143,12 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(maybe_run_queue_via_backing_queue/3 :: - (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok'). --spec(maybe_run_queue_via_backing_queue_async/3 :: - (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue_async/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -440,13 +444,11 @@ internal_delete(QueueName) -> end end). +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). -maybe_run_queue_via_backing_queue(QPid, Mod, Fun) -> - gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}, - infinity). - -maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) -> - gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}). +run_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d8cd510b..5aedb630 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -178,7 +178,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(Q, Recover), + BQS = bq_init(BQ, Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -188,6 +188,20 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) + end, + fun (Mod, Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end) + end). + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl(fun({Arg, Fun}, State1) -> case rabbit_misc:table_lookup(Arguments, Arg) of @@ -230,13 +244,15 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - ensure_rate_timer(State), - State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS) of - true -> {ensure_sync_timer(State2), 0}; - false -> {stop_sync_timer(State2), hibernate} +next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_stats_timer( + ensure_rate_timer( + confirm_messages(MsgIds, State#q{ + backing_queue_state = BQS1}))), + case BQ:needs_idle_timeout(BQS1) of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} end. backing_queue_module(#amqqueue{arguments = Args}) -> @@ -435,6 +451,8 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages([], State) -> + State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -548,12 +566,12 @@ deliver_or_enqueue(Delivery, State) -> ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) -> - maybe_run_queue_via_backing_queue( - BQ, fun (BQS) -> - {_Guids, BQS1} = - BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), - {[], BQS1} +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> + run_backing_queue( + BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = + M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + BQS1 end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -657,15 +675,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - maybe_run_queue_via_backing_queue( - BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). -maybe_run_queue_via_backing_queue(Mod, Fun, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - run_message_queue( - confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). +run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -798,29 +812,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -837,14 +851,14 @@ handle_call({init, Recover}, From, true -> erlang:monitor(process, Owner), declare(Recover, From, State); false -> #q{backing_queue = BQ, backing_queue_state = undefined, - q = #amqqueue{name = QName, durable = IsDurable}} = State, + q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = BQ:init(QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; @@ -1032,12 +1046,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 726b9bef..ce6143dd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,7 +33,21 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 2}, + %% + %% Takes + %% 1. the amqqueue record + %% 2. a boolean indicating whether the queue is an existing queue + %% that should be recovered + %% 3. an asynchronous callback which accepts a function from + %% state to state and invokes it with the current backing + %% queue state. This is useful for handling events, e.g. when + %% the backing queue does not have its own process to receive + %% such events, or when the processing of an event results in + %% a state transition the queue logic needs to know about + %% (such as messages getting confirmed). + %% 4. a synchronous callback. Same as the asynchronous callback + %% but waits for completion and returns 'error' on error. + {init, 4}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -54,6 +68,10 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 5}, + %% Return ids of messages which have been confirmed since + %% the last invocation of this function (or initialisation). + {drain_confirmed, 1}, + %% Drop messages from the head of the queue while the supplied %% predicate returns true. {dropwhile, 2}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 25a1e4b8..0ca73f03 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -16,10 +16,10 @@ -module(rabbit_mirror_queue_master). --export([init/2, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/1, delete_and_terminate/1, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1, invoke/3, validate_message/2]). @@ -37,7 +37,8 @@ backing_queue, backing_queue_state, set_delivered, - seen_status + seen_status, + confirmed }). %% --------------------------------------------------------------------------- @@ -53,7 +54,8 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { arguments = Args, name = QName } = Q, Recover) -> +init(#amqqueue { arguments = Args, name = QName } = Q, Recover, + AsyncCallback, SyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), @@ -64,13 +66,14 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover) -> end, [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1], {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover), + BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, set_delivered = 0, - seen_status = dict:new() }. + seen_status = dict:new(), + confirmed = [] }. promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> #state { gm = GM, @@ -78,7 +81,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> backing_queue = BQ, backing_queue_state = BQS, set_delivered = BQ:len(BQS), - seen_status = SeenStatus }. + seen_status = SeenStatus, + confirmed = [] }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -138,6 +142,35 @@ dropwhile(Fun, State = #state { gm = GM, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. +drain_confirmed(State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS, + confirmed = Confirmed }) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} + end + end, {[], SS}, MsgIds), + {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1, + confirmed = [] }}. + fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -236,38 +269,16 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> BQ:status(BQS). invoke(?MODULE, Fun, State) -> - Fun(State); + Fun(?MODULE, State); invoke(Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - {MsgIds1, SS1} = - lists:foldl( - fun (MsgId, {MsgIdsN, SSN}) -> - case dict:find(MsgId, SSN) of - error -> - {[MsgId | MsgIdsN], SSN}; - {ok, published} -> - %% It was published when we were a slave, - %% and we were promoted before we saw the - %% publish from the channel. We still - %% haven't seen the channel publish, and - %% consequently we need to filter out the - %% confirm here. We will issue the confirm - %% when we see the publish from the channel. - {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; - {ok, confirmed} -> - %% Well, confirms are racy by definition. - {[MsgId | MsgIdsN], SSN} - end - end, {[], SS}, MsgIds), - {MsgIds1, State #state { backing_queue_state = BQS1, - seen_status = SS1 }}. + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + confirmed = Confirmed }) -> %% Here, we need to deal with the possibility that we're about to %% receive a message that we've already seen when we were a slave %% (we received it via gm). Thus if we do receive such message now @@ -299,7 +310,6 @@ validate_message(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately prior to calling validate_message/2. - ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), - {invalid, State #state { seen_status = dict:erase(MsgId, SS) }} + {invalid, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 064dc329..d20b00d4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -94,7 +94,7 @@ init([#amqqueue { name = QueueName } = Q]) -> ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, false), + BQS = bq_init(BQ, Q, false), {ok, #state { q = Q, gm = GM, master_node = node(MPid), @@ -154,12 +154,12 @@ handle_call({gm_deaths, Deaths}, From, {stop, normal, State} end; -handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); @@ -235,20 +235,20 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of - {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; - {gm_deaths, _Deaths} -> 5; - _ -> 0 + {run_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; - {gm, _Msg} -> 5; - _ -> 0 + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + _ -> 0 end. %% --------------------------------------------------------------------------- @@ -282,12 +282,23 @@ handle_msg([SPid], _From, Msg) -> %% Others %% --------------------------------------------------------------------------- -maybe_run_queue_via_backing_queue( - Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), - confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 }). - +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) + end, + fun (Mod, Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end) + end). + +run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> never; @@ -430,18 +441,19 @@ reply(Reply, State) -> {NewState, Timeout} = next_state(State), {reply, Reply, NewState, Timeout}. -next_state(State) -> - State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = - ensure_rate_timer(State), - case BQ:needs_idle_timeout(BQS) of +next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 })), + case BQ:needs_idle_timeout(BQS1) of true -> {ensure_sync_timer(State1), 0}; false -> {stop_sync_timer(State1), hibernate} end. %% copied+pasted from amqqueue_process backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> - maybe_run_queue_via_backing_queue( - BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after( diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e9b8a020..6f5abe3e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2080,6 +2080,10 @@ test_queue_index() -> passed. +variable_queue_init(Q, Recover) -> + rabbit_variable_queue:init( + Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1). + variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( fun (_N, VQN) -> @@ -2114,8 +2118,7 @@ test_amqqueue(Durable) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_amqqueue(true), false, - fun nop/2, fun nop/1), + VQ = variable_queue_init(test_amqqueue(true), false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2290,8 +2293,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2308,8 +2310,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2340,8 +2341,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(Q, true, - fun nop/2, fun nop/1), + VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c9d96db7..9704668e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,9 +16,9 @@ -module(rabbit_variable_queue). --export([init/2, terminate/1, delete_and_terminate/1, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/4, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, drain_confirmed/1, + fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -27,7 +27,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/4]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -238,6 +238,9 @@ durable, transient_threshold, + async_callback, + sync_callback, + len, persistent_count, @@ -252,6 +255,7 @@ msgs_on_disk, msg_indices_on_disk, unconfirmed, + confirmed, ack_out_counter, ack_in_counter, ack_rates @@ -332,11 +336,14 @@ {any(), binary()}}, on_sync :: sync(), durable :: boolean(), + transient_threshold :: non_neg_integer(), + + async_callback :: async_callback(), + sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), - transient_threshold :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -347,6 +354,7 @@ msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), unconfirmed :: gb_set(), + confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). @@ -397,27 +405,26 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover) -> - Self = self(), - init(Queue, Recover, +init(Queue, Recover, AsyncCallback, SyncCallback) -> + init(Queue, Recover, AsyncCallback, SyncCallback, fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(Self, MsgIds, ActionTaken) + msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, - fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end). + fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, false, - MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun); + MsgOnDiskFun, AsyncCallback); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName }, true, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = true }, true, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -427,9 +434,9 @@ init(#amqqueue { name = QueueName }, true, _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun), + MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef, - undefined), + undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -438,7 +445,7 @@ init(#amqqueue { name = QueueName }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). terminate(State) -> @@ -513,9 +520,10 @@ publish(Msg, MsgProps, _ChPid, State) -> publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { len = 0 }) -> + _ChPid, State = #vqstate { async_callback = Callback, + len = 0 }) -> case NeedsConfirming of - true -> blind_confirm(self(), gb_sets:singleton(MsgId)); + true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); false -> ok end, {undefined, a(State)}; @@ -523,14 +531,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, @@ -545,6 +552,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +drain_confirmed(State = #vqstate { confirmed = C }) -> + {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), a(State1). @@ -689,6 +699,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable, tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable, + async_callback = AsyncCallback, + sync_callback = SyncCallback, msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), @@ -697,10 +709,13 @@ tx_commit(Txn, Fun, MsgPropsFun, HasPersistentPubs = PersistentMsgIds =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = msg_store_sync( - MSCState, true, PersistentMsgIds, - msg_store_callback(PersistentMsgIds, Pubs, AckTags1, - Fun, MsgPropsFun)), + true -> MsgStoreCallback = + fun () -> msg_store_callback( + PersistentMsgIds, Pubs, AckTags1, Fun, + MsgPropsFun, AsyncCallback, SyncCallback) + end, + ok = msg_store_sync(MSCState, true, PersistentMsgIds, + fun () -> spawn(MsgStoreCallback) end), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) @@ -866,7 +881,7 @@ status(#vqstate { {avg_ack_egress_rate , AvgAckEgressRate} ]. invoke(?MODULE, Fun, State) -> - Fun(State). + Fun(?MODULE, State). validate_message(_Msg, State) -> {valid, State}. @@ -939,13 +954,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> + msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> - rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, - msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), + rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -978,15 +993,9 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - Self = self(), - fun () -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, ?MODULE, - fun (State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = - msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} - end) + fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> + {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), + State #vqstate { msg_store_clients = MSCState1 } end. maybe_write_delivered(false, _SeqId, IndexState) -> @@ -1072,7 +1081,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient) -> + AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1098,6 +1107,9 @@ init(IsDurable, IndexState, DeltaCount, Terms, durable = IsDurable, transient_threshold = NextSeqId, + async_callback = AsyncCallback, + sync_callback = SyncCallback, + len = DeltaCount1, persistent_count = DeltaCount1, @@ -1112,6 +1124,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), + confirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, ack_rates = blank_rate(Now, 0) }, @@ -1124,24 +1137,20 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) -> - Self = self(), - F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, ?MODULE, - fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) - end, - fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> remove_persistent_messages( - PersistentMsgIds) - end, F) - end) +msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, + AsyncCallback, SyncCallback) -> + case SyncCallback(?MODULE, + fun (?MODULE, StateN) -> + tx_commit_post_msg_store(true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) + end) of + ok -> ok; + error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) end. -remove_persistent_messages(MsgIds) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), +remove_persistent_messages(MsgIds, AsyncCallback) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, + undefined, AsyncCallback), ok = rabbit_msg_store:remove(MsgIds, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). @@ -1432,12 +1441,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> +record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC, + confirmed = C }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), - unconfirmed = gb_sets:difference(UC, MsgIdSet) }. + unconfirmed = gb_sets:difference(UC, MsgIdSet), + confirmed = gb_sets:union (C, MsgIdSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1454,40 +1465,35 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(MsgIdSet, State) -> - {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}. - -blind_confirm(QPid, MsgIdSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, ?MODULE, fun (State) -> msgs_confirmed(MsgIdSet, State) end). - -msgs_written_to_disk(QPid, MsgIdSet, removed) -> - blind_confirm(QPid, MsgIdSet); -msgs_written_to_disk(QPid, MsgIdSet, written) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, ?MODULE, - fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union( - MOD, gb_sets:intersection(UC, MsgIdSet)) }) - end). - -msg_indices_written_to_disk(QPid, MsgIdSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, ?MODULE, - fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union( - MIOD, gb_sets:intersection(UC, MsgIdSet)) }) - end). +blind_confirm(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). + +msgs_written_to_disk(Callback, MsgIdSet, removed) -> + blind_confirm(Callback, MsgIdSet); +msgs_written_to_disk(Callback, MsgIdSet, written) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union(MOD, Confirmed) }) + end). + +msg_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union(MIOD, Confirmed) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |