summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-02 15:58:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-02 15:58:34 +0100
commit16867a384864fbdef705828e2a361352be1e158c (patch)
treeb6a4ad4da8f033701e968e969d878bed4ed04344
parentc16e977dcec99d89fb31abb766745d9a1cab61cf (diff)
parent72e5fb833f2f0c16d4e13e6dd00641a7a44bb32a (diff)
downloadrabbitmq-server-bug25198.tar.gz
Merge in defaultbug25198
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl18
-rw-r--r--src/rabbit_mirror_queue_slave.erl24
3 files changed, 25 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e8d8fa5e..dfd0ab7e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/8]).
+-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -76,8 +76,8 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/8 ::
- (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
+-spec(init_with_backing_queue_state/7 ::
+ (rabbit_types:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -144,7 +144,7 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, Senders, MTC) ->
+ RateTRef, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -166,9 +166,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
delayed_stop = undefined,
queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
- State1 = requeue_and_run(AckTags, process_args(
- rabbit_event:init_stats_timer(
- State, #q.stats_timer))),
+ State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State1, Deliveries).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index cfef98b7..c2bbcf92 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -25,7 +25,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
+-export([promote_backing_queue_state/7, sender_death_fun/0, length_fun/0]).
-export([init_with_existing_bq/3, stop_mirroring/1]).
@@ -61,8 +61,9 @@
known_senders :: set()
}).
--spec(promote_backing_queue_state/6 ::
- (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
+-spec(promote_backing_queue_state/7 ::
+ (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
+ master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(length_fun/0 :: () -> length_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
@@ -377,13 +378,16 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ Len = BQ:len(BQS1),
+ Depth = BQ:depth(BQS1),
+ true = Len == Depth, %% ASSERTION: everything must have been requeued
+ ok = gm:broadcast(GM, {depth, Depth}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
- backing_queue_state = BQS,
+ backing_queue_state = BQS1,
set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b4b0d4d3..c74470f6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -72,7 +72,6 @@
sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
- ack_num,
msg_id_status,
known_senders,
@@ -125,7 +124,6 @@ init(#amqqueue { name = QueueName } = Q) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- ack_num = 0,
msg_id_status = dict:new(),
known_senders = pmon:new(),
@@ -295,7 +293,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
+ Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -528,22 +526,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
[{MsgId, Status}
|| {MsgId, {Status, _ChPid}} <- MSList,
Status =:= published orelse Status =:= confirmed]),
+ AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MPids),
+ CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_, MTC0) ->
MTC0
end, gb_trees:empty(), MSList),
- NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
- AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags,
- Deliveries, KS, MTC).
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -870,19 +867,16 @@ msg_ids_to_acktags(MsgIds, MA) ->
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {Acc, MAN};
- {ok, {_Num, AckTag}} -> {[AckTag | Acc],
- dict:erase(MsgId, MAN)}
+ error -> {Acc, MAN};
+ {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
-maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
- ack_num = Num }) ->
- State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
- ack_num = Num + 1 }.
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
+ State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),