summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 18:39:00 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 18:39:00 +0100
commit3e464a656425438308529f5d1decd42da5060f0f (patch)
tree67001cc7f499103d61e8c524e93fed6aa20b6fc9
parentbb1ba4a5c9f3d2219bffdb4cd6ce3f9ab202e97d (diff)
downloadrabbitmq-server-3e464a656425438308529f5d1decd42da5060f0f.tar.gz
simplify `set_length' case, fixing the ToDrop < 0 branch
The unknown pending counter wasn't updated correctly
-rw-r--r--src/rabbit_mirror_queue_slave.erl36
1 files changed, 16 insertions, 20 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 77f3d11f..75c88b15 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -815,26 +815,22 @@ process_instruction({set_length, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- case AckRequired of
- true -> set_synchronised(Dropped - ToDrop, Length, State1);
- false -> set_synchronised(Length, State1)
- end;
- false ->
- set_synchronised(Length, State)
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> set_synchronised(Dropped - ToDrop, Length, State1);
+ false -> set_synchronised(Length, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,