summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-09 13:46:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-09 13:46:25 +0000
commit5ee0a3b8b57aca3f34c39bc9d824fa2f4b716ef5 (patch)
tree904fd7419ec985989d6fa9efea6af3e3db1c0e9a
parentf0ceca7de46e4d70ec74020fff48dded3096c09e (diff)
parent84016d39416c8b5b08783cdbf4fbbdaacdb82aeb (diff)
downloadrabbitmq-server-5ee0a3b8b57aca3f34c39bc9d824fa2f4b716ef5.tar.gz
Merge bug25951
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_channel.erl102
-rw-r--r--src/rabbit_limiter.erl12
3 files changed, 57 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bd258ecf..c75565d3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -575,10 +575,9 @@ requeue(AckTags, ChPid, State) ->
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
- unchanged ->
- State;
- {unblocked, Consumers1} ->
- run_message_queue(true, State#q{consumers = Consumers1})
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4d778f94..eded8a90 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -694,12 +694,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
- requeue = Requeue},
- _, State) ->
+ requeue = Requeue}, _, State) ->
reject(DeliveryTag, Requeue, Multiple, State);
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
- multiple = Multiple},
+ multiple = Multiple},
_, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
@@ -710,8 +709,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1#ch{tx = {Msgs, Acks1}}
end};
-handle_method(#'basic.get'{queue = QueueNameBin,
- no_ack = NoAck},
+handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
limiter = Limiter,
@@ -749,9 +747,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
+ _, State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -801,8 +799,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
end;
-handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = NoWait},
+handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
@@ -849,13 +846,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = 0}, _,
- State = #ch{limiter = Limiter}) ->
+handle_method(#'basic.qos'{prefetch_count = 0},
+ _, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
- State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
%% TODO queue:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
@@ -864,8 +861,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
+ _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
foreach_per_queue(
@@ -887,19 +883,18 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
Content, State),
{reply, #'basic.recover_ok'{}, State1};
-handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
- requeue = Requeue},
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
_, State) ->
reject(DeliveryTag, Requeue, false, State);
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
- passive = false,
- durable = Durable,
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ type = TypeNameBin,
+ passive = false,
+ durable = Durable,
auto_delete = AutoDelete,
- internal = Internal,
- nowait = NoWait,
- arguments = Args},
+ internal = Internal,
+ nowait = NoWait,
+ arguments = Args},
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -932,17 +927,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- passive = true,
- nowait = NoWait},
+ passive = true,
+ nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused,
- nowait = NoWait},
+ nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
@@ -958,19 +953,19 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
end;
handle_method(#'exchange.bind'{destination = DestinationNameBin,
- source = SourceNameBin,
+ source = SourceNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.bind_ok'{}, NoWait, State);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
- source = SourceNameBin,
+ source = SourceNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.unbind_ok'{}, NoWait, State);
@@ -1062,10 +1057,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
-handle_method(#'queue.delete'{queue = QueueNameBin,
+handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
- if_empty = IfEmpty,
- nowait = NoWait},
+ if_empty = IfEmpty,
+ nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
@@ -1087,25 +1082,24 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
#'queue.delete_ok'{message_count = PurgedMessageCount})
end;
-handle_method(#'queue.bind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
+handle_method(#'queue.bind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
-handle_method(#'queue.unbind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
routing_key = RoutingKey,
- arguments = Arguments}, _, State) ->
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
-handle_method(#'queue.purge'{queue = QueueNameBin,
- nowait = NoWait},
+handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
@@ -1153,15 +1147,15 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter = Limiter}) ->
+handle_method(#'channel.flow'{active = true},
+ _, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unblock(Limiter),
{reply, #'channel.flow_ok'{active = true},
maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-handle_method(#'channel.flow'{active = false}, _,
- State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
+handle_method(#'channel.flow'{active = false},
+ _, State = #ch{consumer_mapping = Consumers,
+ limiter = Limiter}) ->
case rabbit_limiter:is_blocked(Limiter) of
true -> {noreply, maybe_send_flow_ok(State)};
false -> Limiter1 = rabbit_limiter:block(Limiter),
@@ -1186,8 +1180,8 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
- drain = Drain}, _,
- State = #ch{consumer_mapping = Consumers}) ->
+ drain = Drain},
+ _, State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:credit(
Q, self(), CTag, Credit, Drain),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4e1e299c..2857ca55 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -245,9 +245,9 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
case is_consumer_blocked(L, CTag) of
false -> case (State =/= active orelse
safe_call(Pid, {can_send, self(), AckRequired}, true)) of
- true -> {continue, L#qstate{
- credits = record_send_q(CTag, Credits)}};
- false -> {suspend, L#qstate{state = suspended}}
+ true -> Credits1 = decrement_credit(CTag, Credits),
+ {continue, L#qstate{credits = Credits1}};
+ false -> {suspend, L#qstate{state = suspended}}
end;
true -> {suspend, L}
end.
@@ -271,9 +271,9 @@ is_suspended(#qstate{}) -> false.
is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
case gb_trees:lookup(CTag, Credits) of
+ none -> false;
{value, #credit{credit = C}} when C > 0 -> false;
- {value, #credit{}} -> true;
- none -> false
+ {value, #credit{}} -> true
end.
credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
@@ -305,7 +305,7 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
-record_send_q(CTag, Credits) ->
+decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
{value, #credit{credit = Credit, drain = Drain}} ->
update_credit(CTag, Credit - 1, Drain, Credits);