diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 13:46:25 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 13:46:25 +0000 |
commit | 5ee0a3b8b57aca3f34c39bc9d824fa2f4b716ef5 (patch) | |
tree | 904fd7419ec985989d6fa9efea6af3e3db1c0e9a | |
parent | f0ceca7de46e4d70ec74020fff48dded3096c09e (diff) | |
parent | 84016d39416c8b5b08783cdbf4fbbdaacdb82aeb (diff) | |
download | rabbitmq-server-5ee0a3b8b57aca3f34c39bc9d824fa2f4b716ef5.tar.gz |
Merge bug25951
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 102 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 12 |
3 files changed, 57 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bd258ecf..c75565d3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -575,10 +575,9 @@ requeue(AckTags, ChPid, State) -> possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of - unchanged -> - State; - {unblocked, Consumers1} -> - run_message_queue(true, State#q{consumers = Consumers1}) + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d778f94..eded8a90 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -694,12 +694,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, - requeue = Requeue}, - _, State) -> + requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, Multiple, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, - multiple = Multiple}, + multiple = Multiple}, _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, @@ -710,8 +709,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1#ch{tx = {Msgs, Acks1}} end}; -handle_method(#'basic.get'{queue = QueueNameBin, - no_ack = NoAck}, +handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, limiter = Limiter, @@ -749,9 +747,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -801,8 +799,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) end; -handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, - nowait = NoWait}, +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, @@ -849,13 +846,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, - State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, @@ -864,8 +861,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> + _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), foreach_per_queue( @@ -887,19 +883,18 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> Content, State), {reply, #'basic.recover_ok'{}, State1}; -handle_method(#'basic.reject'{delivery_tag = DeliveryTag, - requeue = Requeue}, +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, false, State); -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, - passive = false, - durable = Durable, +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, auto_delete = AutoDelete, - internal = Internal, - nowait = NoWait, - arguments = Args}, + internal = Internal, + nowait = NoWait, + arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -932,17 +927,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - passive = true, - nowait = NoWait}, + passive = true, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{exchange = ExchangeNameBin, +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, - nowait = NoWait}, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), @@ -958,19 +953,19 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, end; handle_method(#'exchange.bind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.bind_ok'{}, NoWait, State); handle_method(#'exchange.unbind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); @@ -1062,10 +1057,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); -handle_method(#'queue.delete'{queue = QueueNameBin, +handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, - if_empty = IfEmpty, - nowait = NoWait}, + if_empty = IfEmpty, + nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), @@ -1087,25 +1082,24 @@ handle_method(#'queue.delete'{queue = QueueNameBin, #'queue.delete_ok'{message_count = PurgedMessageCount}) end; -handle_method(#'queue.bind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); -handle_method(#'queue.unbind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - arguments = Arguments}, _, State) -> + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); -handle_method(#'queue.purge'{queue = QueueNameBin, - nowait = NoWait}, +handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), @@ -1153,15 +1147,15 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'channel.flow'{active = true}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unblock(Limiter), {reply, #'channel.flow_ok'{active = true}, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; -handle_method(#'channel.flow'{active = false}, _, - State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> +handle_method(#'channel.flow'{active = false}, + _, State = #ch{consumer_mapping = Consumers, + limiter = Limiter}) -> case rabbit_limiter:is_blocked(Limiter) of true -> {noreply, maybe_send_flow_ok(State)}; false -> Limiter1 = rabbit_limiter:block(Limiter), @@ -1186,8 +1180,8 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, - drain = Drain}, _, - State = #ch{consumer_mapping = Consumers}) -> + drain = Drain}, + _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:credit( Q, self(), CTag, Credit, Drain), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4e1e299c..2857ca55 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -245,9 +245,9 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, case is_consumer_blocked(L, CTag) of false -> case (State =/= active orelse safe_call(Pid, {can_send, self(), AckRequired}, true)) of - true -> {continue, L#qstate{ - credits = record_send_q(CTag, Credits)}}; - false -> {suspend, L#qstate{state = suspended}} + true -> Credits1 = decrement_credit(CTag, Credits), + {continue, L#qstate{credits = Credits1}}; + false -> {suspend, L#qstate{state = suspended}} end; true -> {suspend, L} end. @@ -271,9 +271,9 @@ is_suspended(#qstate{}) -> false. is_consumer_blocked(#qstate{credits = Credits}, CTag) -> case gb_trees:lookup(CTag, Credits) of + none -> false; {value, #credit{credit = C}} when C > 0 -> false; - {value, #credit{}} -> true; - none -> false + {value, #credit{}} -> true end. credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> @@ -305,7 +305,7 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -record_send_q(CTag, Credits) -> +decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of {value, #credit{credit = Credit, drain = Drain}} -> update_credit(CTag, Credit - 1, Drain, Credits); |