summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-04 15:42:55 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-04 15:42:55 +0000
commit93cad02186b4c0ac27a02090a9e0c719ac6dfc8d (patch)
tree6fcec30dbca48972e3a7b7dee567a84ca3bf9ba1
parentd5b173e4611ca319ce8f0d65fdce0e84b879d919 (diff)
downloadrabbitmq-server-93cad02186b4c0ac27a02090a9e0c719ac6dfc8d.tar.gz
Ignore queue depth for maxlen
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl25
2 files changed, 12 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9d6dcd15..92a9f4b3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -415,9 +415,9 @@ check_int_arg({Type, _}, _) ->
check_max_length_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > 0 -> ok;
- ok -> {error, {value_not_positive, Val}};
- Error -> Error
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
end.
check_expires_arg({Type, Val}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6eb40886..a4a30021 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -571,11 +571,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2} ->
- case publish_max(Delivery, Props, Delivered, State2) of
- nopub -> State2;
- BQS1 -> ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
- end
+ BQS1 = publish_max(Delivery, Props, Delivered, State2),
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
publish_max(#delivery{message = Message,
@@ -590,16 +588,11 @@ publish_max(#delivery{message = Message,
Props, Delivered, #q{backing_queue = BQ,
backing_queue_state = BQS,
max_length = MaxLen}) ->
- case {BQ:depth(BQS) >= MaxLen, BQ:len(BQS) =:= 0} of
- {false, _} ->
- BQ:publish(Message, Props, Delivered, SenderPid, BQS);
- {true, true} ->
- (dead_letter_fun(maxlen))([{Message, undefined}]),
- nopub;
- {true, false} ->
- {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
- (dead_letter_fun(maxlen))([{Msg, AckTag}]),
- BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
+ case BQ:len(BQS) >= MaxLen of
+ true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ (dead_letter_fun(maxlen))([{Msg, AckTag}]),
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS1);
+ false -> BQ:publish(Message, Props, Delivered, SenderPid, BQS)
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -833,7 +826,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
unconfirmed = UC,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack([Ack || Ack <- AckTags, Ack /= undefined], BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State1 = State#q{backing_queue_state = BQS1},
case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of