From d0ccf58dec4511707ad10e6a7b8970c34c2e52ad Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 1 Oct 2012 20:04:48 +0100 Subject: requeue unacked messages earlier during promotion ...so that the set_delivered marker is set correctly. Note that - there is no need to sort AckTags - BQ:requeue doesn't care about the order in which it is given the tags. - we don't need to 'run the backing queue' / 'run the message queue' when requeuing since there are no consumers. But we *do* need to drop expired messages; process_args takes care of that already. --- src/rabbit_amqqueue_process.erl | 12 +++++------- src/rabbit_mirror_queue_master.erl | 18 +++++++++++------- src/rabbit_mirror_queue_slave.erl | 9 ++++----- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a0e74b42..a79a2ee0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/8]). +-export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -76,8 +76,8 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/8 :: - (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], +-spec(init_with_backing_queue_state/7 :: + (rabbit_types:amqqueue(), atom(), tuple(), any(), [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -144,7 +144,7 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, Senders, MTC) -> + RateTRef, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -166,9 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, delayed_stop = undefined, queue_monitors = pmon:new(), msg_id_to_channel = MTC}, - State1 = requeue_and_run(AckTags, process_args( - rabbit_event:init_stats_timer( - State, #q.stats_timer))), + State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State1, Deliveries). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4cfb3dcb..5d302329 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +-export([promote_backing_queue_state/7, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -59,8 +59,9 @@ known_senders :: set() }). --spec(promote_backing_queue_state/6 :: - (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(promote_backing_queue_state/7 :: + (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> + master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(length_fun/0 :: () -> length_fun()). @@ -372,13 +373,16 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - Len = BQ:len(BQS), - ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), +promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + Len = BQ:len(BQS1), + Depth = BQ:depth(BQS1), + true = Len == Depth, %% ASSERTION: everything must have been requeued + ok = gm:broadcast(GM, {depth, Depth}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, - backing_queue_state = BQS, + backing_queue_state = BQS1, set_delivered = Len, seen_status = SeenStatus, confirmed = [], diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8e541db1..1832049d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -287,7 +287,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), + Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -520,22 +520,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, [{MsgId, Status} || {MsgId, {Status, _ChPid}} <- MSList, Status =:= published orelse Status =:= confirmed]), + AckTags = [AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MPids), + CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); (_, MTC0) -> MTC0 end, gb_trees:empty(), MSList), - NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], - AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), + Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> -- cgit v1.2.1 From 72e5fb833f2f0c16d4e13e6dd00641a7a44bb32a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 2 Oct 2012 11:35:11 +0100 Subject: remove gratuitous ack numbering the sole purpose of which was to sort the acktags by that number, which serves no purpose whatsoever. --- src/rabbit_mirror_queue_slave.erl | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1832049d..2a58e897 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -72,7 +72,6 @@ sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag - ack_num, msg_id_status, known_senders, @@ -125,7 +124,6 @@ init(#amqqueue { name = QueueName } = Q) -> sender_queues = dict:new(), msg_id_ack = dict:new(), - ack_num = 0, msg_id_status = dict:new(), known_senders = pmon:new(), @@ -520,7 +518,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, [{MsgId, Status} || {MsgId, {Status, _ChPid}} <- MSList, Status =:= published orelse Status =:= confirmed]), - AckTags = [AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], + AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, AckTags, SS, MPids), @@ -862,19 +860,16 @@ msg_ids_to_acktags(MsgIds, MA) -> lists:foldl( fun (MsgId, {Acc, MAN}) -> case dict:find(MsgId, MA) of - error -> {Acc, MAN}; - {ok, {_Num, AckTag}} -> {[AckTag | Acc], - dict:erase(MsgId, MAN)} + error -> {Acc, MAN}; + {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. maybe_store_ack(false, _MsgId, _AckTag, State) -> State; -maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, - ack_num = Num }) -> - State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), - ack_num = Num + 1 }. +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> + State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. set_delta(0, State = #state { depth_delta = undefined }) -> ok = record_synchronised(State#state.q), -- cgit v1.2.1