summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 16:03:38 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 16:03:38 +0100
commit4a60868b78268113c92a77a8d375afe502f314bf (patch)
treefd00bc4da6876ee29210a5378c719648a12757b6
parentc43a77640b11446be3fdc276930b771acb4f2dfc (diff)
downloadrabbitmq-server-4a60868b78268113c92a77a8d375afe502f314bf.tar.gz
do not track external pendings until we receive `length'
Otherwise, we might break some assertions.
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
1 files changed, 7 insertions, 2 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 38bbf59f..4e153ca1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -133,7 +133,7 @@ init(#amqqueue { name = QueueName } = Q) ->
known_senders = pmon:new(),
synchronised = false,
- external_pending = 0
+ external_pending = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -849,7 +849,10 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
{_, false} when QLen =< Remaining ->
set_synchronised(Remaining, State);
{_, true} when QLen =< Remaining ->
- State #state { external_pending = ExtPending + 1}
+ State #state { external_pending = case ExtPending of
+ undefined -> undefined;
+ _ -> ExtPending + 1
+ end }
end};
process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
@@ -915,6 +918,8 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+set_synchronised(_, _, _, State = #state { external_pending = undefined }) ->
+ State;
set_synchronised(LocalPending, RemotePending, Length,
State = #state { backing_queue = BQ,
backing_queue_state = BQS,