diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-09 11:32:36 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-09 11:32:36 +0000 |
commit | e68984c2f411371d8d0512557639c30b48ffa8da (patch) | |
tree | e27e301ab937bc1c4576b13433c337a69a1e267f | |
parent | 6212861309962a136e7b2ed0480d77c00dcb395b (diff) | |
parent | 84016d39416c8b5b08783cdbf4fbbdaacdb82aeb (diff) | |
download | rabbitmq-server-e68984c2f411371d8d0512557639c30b48ffa8da.tar.gz |
merge default into bug25938
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 146 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 22 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 102 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 12 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 11 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 62 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 13 |
7 files changed, 182 insertions, 186 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 281aecb9..d916dccb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> @@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -213,18 +213,17 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +maybe_notify_decorators(false, State) -> State; +maybe_notify_decorators(true, State) -> notify_decorators(State), State. -notify_decorators(Event, Props, State = #q{consumers = Consumers, - backing_queue = BQ, - backing_queue_state = BQS}) -> +notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []). + +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> P = rabbit_queue_consumers:max_active_priority(Consumers), - decorator_callback(qname(State), notify, - [Event, [{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)} | - Props]]). + decorator_callback(qname(State), consumer_state_changed, + [P, BQ:is_empty(BQS)]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -308,7 +307,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), + notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _, _} <- rabbit_queue_consumers:all(Consumers)], @@ -401,21 +400,12 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), + true -> notify_decorators(State), rabbit_queue_consumers:send_drained(); false -> ok end, State. -deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> - {Active, Blocked, State1, Consumers1} = - rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, - Consumers), - State2 = State1#q{consumers = Consumers1}, - [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) || - {_ChPid, CTag} <- Blocked], - {Active, State2}. - confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -461,49 +451,68 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - {_Active, State3} = deliver_msgs_to_consumers( - fun(AckRequired, State1) -> - {Result, State2} = fetch(AckRequired, State1), - {Result, is_empty(State2), State2} - end, is_empty(State), State), - State3. +run_message_queue(State) -> run_message_queue(false, State). + +run_message_queue(ActiveConsumersChanged, State) -> + case is_empty(State) of + true -> maybe_notify_decorators(ActiveConsumersChanged, State); + false -> case rabbit_queue_consumers:deliver( + fun(AckRequired) -> fetch(AckRequired, State) end, + qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged1, State1, Consumers} -> + run_message_queue( + ActiveConsumersChanged or ActiveConsumersChanged1, + State1#q{consumers = Consumers}); + {undelivered, ActiveConsumersChanged1, Consumers} -> + maybe_notify_decorators( + ActiveConsumersChanged or ActiveConsumersChanged1, + State#q{consumers = Consumers}) + end + end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case IsDuplicate of - false -> deliver_msgs_to_consumers( - fun (true, State2 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State2#q{backing_queue_state = BQS3}}; - (false, State2) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State2)} - end, false, State1); - true -> {true, State1} + case rabbit_queue_consumers:deliver( + fun (true) -> true = BQ:is_empty(BQS), + {AckTag, BQS1} = BQ:publish_delivered( + Message, Props, SenderPid, BQS), + {{Message, Delivered, AckTag}, + State#q{backing_queue_state = BQS1}}; + (false) -> {{Message, Delivered, undefined}, + discard(Delivery, State)} + end, qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, maybe_notify_decorators( + ActiveConsumersChanged, + State1#q{consumers = Consumers})}; + {undelivered, ActiveConsumersChanged, Consumers} -> + {undelivered, maybe_notify_decorators( + ActiveConsumersChanged, + State#q{consumers = Consumers})} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, - Delivered, State) -> + Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), - case attempt_delivery(Delivery, Props, Delivered, State1) of - {true, State2} -> + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State2 = State1#q{backing_queue_state = BQS1}, + case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, + State2) of + true -> State2; + {delivered, State3} -> + State3; %% The next one is an optimisation - {false, State2 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), - QLen = BQ:len(BQS2), + {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> + discard(Delivery, State3); + {undelivered, State3 = #q{backing_queue_state = BQS2}} -> + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + {Dropped, State4 = #q{backing_queue_state = BQS4}} = + maybe_drop_head(State3#q{backing_queue_state = BQS3}), + QLen = BQ:len(BQS4), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -512,9 +521,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State3; - {true, true, undefined} -> State3; - {_, _, _} -> drop_expired_msgs(State3) + {false, false, _} -> State4; + {true, true, undefined} -> State4; + {_, _, _} -> drop_expired_msgs(State4) end end. @@ -566,13 +575,9 @@ requeue(AckTags, ChPid, State) -> possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of - unchanged -> - State; - {unblocked, UnblockedCTags, Consumers1} -> - State1 = State#q{consumers = Consumers1}, - [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], - State1) || CTag <- UnblockedCTags], - run_message_queue(State1) + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -599,8 +604,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end, State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- ChCTags], + notify_decorators(State2), case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(ChAckTags, @@ -1034,8 +1038,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1), OtherArgs), - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1054,8 +1057,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1218,7 +1220,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, ChPid, State)); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 8eaac10d..83f68ed3 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -120,18 +120,18 @@ table_field_to_binary({FName, T, V}) -> [short_string_to_binary(FName) | field_value_to_binary(T, V)]. field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)]; -field_value_to_binary(signedint, V) -> [$I | <<V:32/signed>>]; +field_value_to_binary(signedint, V) -> [$I, <<V:32/signed>>]; field_value_to_binary(decimal, V) -> {Before, After} = V, - [$D | [Before, <<After:32>>]]; -field_value_to_binary(timestamp, V) -> [$T | <<V:64>>]; + [$D, Before, <<After:32>>]; +field_value_to_binary(timestamp, V) -> [$T, <<V:64>>]; field_value_to_binary(table, V) -> [$F | table_to_binary(V)]; field_value_to_binary(array, V) -> [$A | array_to_binary(V)]; -field_value_to_binary(byte, V) -> [$b | <<V:8/unsigned>>]; -field_value_to_binary(double, V) -> [$d | <<V:64/float>>]; -field_value_to_binary(float, V) -> [$f | <<V:32/float>>]; -field_value_to_binary(long, V) -> [$l | <<V:64/signed>>]; -field_value_to_binary(short, V) -> [$s | <<V:16/signed>>]; -field_value_to_binary(bool, V) -> [$t | [if V -> 1; true -> 0 end]]; +field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>]; +field_value_to_binary(double, V) -> [$d, <<V:64/float>>]; +field_value_to_binary(float, V) -> [$f, <<V:32/float>>]; +field_value_to_binary(long, V) -> [$l, <<V:64/signed>>]; +field_value_to_binary(short, V) -> [$s, <<V:16/signed>>]; +field_value_to_binary(bool, V) -> [$t, if V -> 1; true -> 0 end]; field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)]; field_value_to_binary(void, _V) -> [$V]. @@ -154,13 +154,13 @@ generate_array_iolist(Array) -> short_string_to_binary(String) -> Len = string_length(String), - if Len < 256 -> [<<Len:8>> | String]; + if Len < 256 -> [<<Len:8>>, String]; true -> exit(content_properties_shortstr_overflow) end. long_string_to_binary(String) -> Len = string_length(String), - [<<Len:32>> | String]. + [<<Len:32>>, String]. string_length(String) when is_binary(String) -> size(String); string_length(String) -> length(String). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d778f94..eded8a90 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -694,12 +694,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, - requeue = Requeue}, - _, State) -> + requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, Multiple, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, - multiple = Multiple}, + multiple = Multiple}, _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, @@ -710,8 +709,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1#ch{tx = {Msgs, Acks1}} end}; -handle_method(#'basic.get'{queue = QueueNameBin, - no_ack = NoAck}, +handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, limiter = Limiter, @@ -749,9 +747,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -801,8 +799,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) end; -handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, - nowait = NoWait}, +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, @@ -849,13 +846,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, - State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, @@ -864,8 +861,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> + _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), foreach_per_queue( @@ -887,19 +883,18 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> Content, State), {reply, #'basic.recover_ok'{}, State1}; -handle_method(#'basic.reject'{delivery_tag = DeliveryTag, - requeue = Requeue}, +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, false, State); -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, - passive = false, - durable = Durable, +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, auto_delete = AutoDelete, - internal = Internal, - nowait = NoWait, - arguments = Args}, + internal = Internal, + nowait = NoWait, + arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -932,17 +927,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - passive = true, - nowait = NoWait}, + passive = true, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{exchange = ExchangeNameBin, +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, - nowait = NoWait}, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), @@ -958,19 +953,19 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, end; handle_method(#'exchange.bind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.bind_ok'{}, NoWait, State); handle_method(#'exchange.unbind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); @@ -1062,10 +1057,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); -handle_method(#'queue.delete'{queue = QueueNameBin, +handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, - if_empty = IfEmpty, - nowait = NoWait}, + if_empty = IfEmpty, + nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), @@ -1087,25 +1082,24 @@ handle_method(#'queue.delete'{queue = QueueNameBin, #'queue.delete_ok'{message_count = PurgedMessageCount}) end; -handle_method(#'queue.bind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); -handle_method(#'queue.unbind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - arguments = Arguments}, _, State) -> + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); -handle_method(#'queue.purge'{queue = QueueNameBin, - nowait = NoWait}, +handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), @@ -1153,15 +1147,15 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'channel.flow'{active = true}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unblock(Limiter), {reply, #'channel.flow_ok'{active = true}, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; -handle_method(#'channel.flow'{active = false}, _, - State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> +handle_method(#'channel.flow'{active = false}, + _, State = #ch{consumer_mapping = Consumers, + limiter = Limiter}) -> case rabbit_limiter:is_blocked(Limiter) of true -> {noreply, maybe_send_flow_ok(State)}; false -> Limiter1 = rabbit_limiter:block(Limiter), @@ -1186,8 +1180,8 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, - drain = Drain}, _, - State = #ch{consumer_mapping = Consumers}) -> + drain = Drain}, + _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:credit( Q, self(), CTag, Credit, Drain), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 10e00fa3..c33b3c74 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -245,9 +245,9 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, case is_consumer_blocked(L, CTag) of false -> case (State =/= active orelse safe_call(Pid, {can_send, self(), AckRequired}, true)) of - true -> {continue, L#qstate{ - credits = record_send_q(CTag, Credits)}}; - false -> {suspend, L#qstate{state = suspended}} + true -> Credits1 = decrement_credit(CTag, Credits), + {continue, L#qstate{credits = Credits1}}; + false -> {suspend, L#qstate{state = suspended}} end; true -> {suspend, L} end. @@ -271,9 +271,9 @@ is_suspended(#qstate{}) -> false. is_consumer_blocked(#qstate{credits = Credits}, CTag) -> case gb_trees:lookup(CTag, Credits) of + none -> false; {value, #credit{credit = C}} when C > 0 -> false; - {value, #credit{}} -> true; - none -> false + {value, #credit{}} -> true end. credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> @@ -305,7 +305,7 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -record_send_q(CTag, Credits) -> +decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of {value, #credit{credit = Credit, drain = Drain}} -> update_credit(CTag, Credit - 1, Drain, Credits); diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b54fdd2e..5a1613a7 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,9 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2]). + is_running/2, is_process_running/2, fqdn_nodename/0]). + +-include_lib("kernel/include/inet.hrl"). -define(EPMD_TIMEOUT, 30000). @@ -35,6 +37,7 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). +-spec(fqdn_nodename/0 :: () -> binary()). -endif. @@ -107,3 +110,9 @@ is_process_running(Node, Process) -> undefined -> false; P when is_pid(P) -> true end. + +fqdn_nodename() -> + {ID, _} = rabbit_nodes:parts(node()), + {ok, Host} = inet:gethostname(), + {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), + list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 702091dc..f06423f7 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/5, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -76,14 +76,14 @@ 'not_found' | {[ack()], [rabbit_types:ctag()], state()}. -spec send_drained() -> 'ok'. --spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), - boolean(), rabbit_amqqueue:name(), T, state()) -> - {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}. +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state()) -> + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | - {'unblocked', [rabbit_types:ctag()], state()}. + 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -181,44 +181,41 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, [], S, State). +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). -deliver(_FetchFun, true, _QName, Blocked, S, State) -> - {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, - State = #state{consumers = Consumers, use = Use}) -> +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; + {undelivered, ConsumersChanged, + State#state{use = update_use(State#state.use, inactive)}}; {{value, QEntry, Priority}, Tail} -> - {Stop, Blocked1, S1, Consumers1} = - deliver_to_consumer(FetchFun, QEntry, Priority, QName, - Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, - State#state{consumers = Consumers1}) + case deliver_to_consumer(FetchFun, QEntry, QName) of + {delivered, R} -> + {delivered, ConsumersChanged, R, + State#state{consumers = priority_queue:in(QEntry, Priority, + Tail)}}; + undelivered -> + deliver(FetchFun, QName, true, + State#state{consumers = Tail}) + end end. -deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> +deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + undelivered; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + undelivered; {continue, Limiter} -> - {Stop, S1} = deliver_to_consumer( - FetchFun, Consumer, - C#cr{limiter = Limiter}, QName, S), - {Stop, Blocked, S1, - priority_queue:in(E, Priority, Consumers)} + {delivered, deliver_to_consumer( + FetchFun, Consumer, + C#cr{limiter = Limiter}, QName)} end end. @@ -228,8 +225,8 @@ deliver_to_consumer(FetchFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - QName, S) -> - {{Message, IsDelivered, AckTag}, Stop, S1} = FetchFun(AckRequired, S), + QName) -> + {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -238,7 +235,7 @@ deliver_to_consumer(FetchFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {Stop, S1}. + R. record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), @@ -290,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, - tags(Unblocked), State#state{consumers = priority_queue:join(Consumers, UnblockedQ), use = update_use(Use, active)}} end. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..6205e2dc 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,9 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +%% called with Queue, MaxActivePriority, IsEmpty +-callback consumer_state_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. -else. @@ -32,7 +27,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {consumer_state_changed, 3}]; behaviour_info(_Other) -> undefined. |