summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-12-14 11:51:41 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-12-14 11:51:41 +0000
commit01db5e5c77b2bccf0133ff17c836b269d5931562 (patch)
tree4a2c775925ce34c64643ec5fcdf578a37ab2ed61
parenta381c38a2965c0888589635c233afd7298838be9 (diff)
downloadrabbitmq-server-01db5e5c77b2bccf0133ff17c836b269d5931562.tar.gz
Better maxdepth checking
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 16 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0de9b4e4..b61df6d6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -584,28 +584,23 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
State2#q{backing_queue_state = BQS1})
end.
-publish_max(Message, Props, Delivered, SenderPid,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_depth = undefined }) ->
+publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = undefined }) ->
BQ:publish(Message, Props, Delivered, SenderPid, BQS);
-publish_max(Message, Props, Delivered, SenderPid,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_depth = MaxDepth }) ->
- Depth = BQ:depth(BQS),
- case Depth >= MaxDepth of
- true ->
- Length = BQ:len(BQS),
- case Length >= MaxDepth of
- false ->
- BQS;
- true ->
- {M, BQS1} = BQ:fetch(false, BQS),
- BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
- end;
- false->
- BQ:publish(Message, Props, Delivered, SenderPid, BQS)
+publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ dlx = XName,
+ max_depth = MaxDepth }) ->
+ {Depth, Len} = {BQ:depth(BQS), BQ:len(BQS)},
+ case {Depth >= MaxDepth, Len =:= 0} of
+ {false, _} ->
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS);
+ {true, true} ->
+ BQS;
+ {true, false} ->
+ {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(false, BQS),
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,