diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-01 20:04:48 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-01 20:04:48 +0100 |
commit | d0ccf58dec4511707ad10e6a7b8970c34c2e52ad (patch) | |
tree | 318cec6c3dbfe86dd60585216a3e6e3c32b8f432 | |
parent | 4b6a2777580d67ab069c86349f32f832feff8fc5 (diff) | |
download | rabbitmq-server-d0ccf58dec4511707ad10e6a7b8970c34c2e52ad.tar.gz |
requeue unacked messages earlier during promotion
...so that the set_delivered marker is set correctly.
Note that
- there is no need to sort AckTags - BQ:requeue doesn't care
about the order in which it is given the tags.
- we don't need to 'run the backing queue' / 'run the message queue'
when requeuing since there are no consumers. But we *do* need to
drop expired messages; process_args takes care of that already.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 18 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 9 |
3 files changed, 20 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a0e74b42..a79a2ee0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/8]). +-export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -76,8 +76,8 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/8 :: - (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], +-spec(init_with_backing_queue_state/7 :: + (rabbit_types:amqqueue(), atom(), tuple(), any(), [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -144,7 +144,7 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, Senders, MTC) -> + RateTRef, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -166,9 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, delayed_stop = undefined, queue_monitors = pmon:new(), msg_id_to_channel = MTC}, - State1 = requeue_and_run(AckTags, process_args( - rabbit_event:init_stats_timer( - State, #q.stats_timer))), + State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State1, Deliveries). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4cfb3dcb..5d302329 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +-export([promote_backing_queue_state/7, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -59,8 +59,9 @@ known_senders :: set() }). --spec(promote_backing_queue_state/6 :: - (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(promote_backing_queue_state/7 :: + (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> + master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(length_fun/0 :: () -> length_fun()). @@ -372,13 +373,16 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - Len = BQ:len(BQS), - ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), +promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + Len = BQ:len(BQS1), + Depth = BQ:depth(BQS1), + true = Len == Depth, %% ASSERTION: everything must have been requeued + ok = gm:broadcast(GM, {depth, Depth}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, - backing_queue_state = BQS, + backing_queue_state = BQS1, set_delivered = Len, seen_status = SeenStatus, confirmed = [], diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8e541db1..1832049d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -287,7 +287,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), + Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -520,22 +520,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, [{MsgId, Status} || {MsgId, {Status, _ChPid}} <- MSList, Status =:= published orelse Status =:= confirmed]), + AckTags = [AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MPids), + CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); (_, MTC0) -> MTC0 end, gb_trees:empty(), MSList), - NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], - AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), + Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> |