diff options
author | Tim Watson <watson.timothy@gmail.com> | 2014-01-09 12:29:45 +0000 |
---|---|---|
committer | Tim Watson <watson.timothy@gmail.com> | 2014-01-09 12:29:45 +0000 |
commit | d8170d3894fd7214b481a05aa14de82a0958ed2e (patch) | |
tree | f08c1a6d9022c9c4f8f02e39d2a6ff2b463346ef | |
parent | d400309403571c32c6beeb2163feccf42e4f2bf6 (diff) | |
parent | 84016d39416c8b5b08783cdbf4fbbdaacdb82aeb (diff) | |
download | rabbitmq-server-d8170d3894fd7214b481a05aa14de82a0958ed2e.tar.gz |
Merge default into bug25827
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 616 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 61 | ||||
-rw-r--r-- | src/rabbit_binary_parser.erl | 28 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 102 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 22 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 11 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 422 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 13 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 147 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 23 |
10 files changed, 810 insertions, 635 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4c4ebb25..8e31cac4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,6 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -38,8 +37,7 @@ has_had_consumers, backing_queue, backing_queue_state, - active_consumers, - consumer_use, + consumers, expires, sync_timer_ref, rate_timer_ref, @@ -57,21 +55,6 @@ status }). --record(consumer, {tag, ack_required, args}). - -%% These are held in our process dictionary --record(cr, {ch_pid, - monitor_ref, - acktags, - consumer_count, - %% Queue of {ChPid, #consumer{}} for consumers which have - %% been blocked for any reason - blocked_consumers, - %% The limiter itself - limiter, - %% Internal flow control for queue -> writer - unsent_message_count}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -143,15 +126,14 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - active_consumers = priority_queue:new(), - consumer_use = {inactive, now_micros(), 0, 0.0}, + consumers = rabbit_queue_consumers:new(), senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -208,7 +190,7 @@ declare(Recover, From, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -238,17 +220,17 @@ matches(false, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +maybe_notify_decorators(false, State) -> State; +maybe_notify_decorators(true, State) -> notify_decorators(State), State. + +notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []). -notify_decorators(Event, Props, State = #q{active_consumers = ACs, - backing_queue = BQ, - backing_queue_state = BQS}) -> - decorator_callback( - qname(State), notify, - [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)}, - {is_empty, BQ:is_empty(BQS)} | Props]]). +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + P = rabbit_queue_consumers:max_active_priority(Consumers), + decorator_callback(qname(State), consumer_state_changed, + [P, BQ:is_empty(BQS)]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -322,7 +304,7 @@ init_max_length(MaxLen, State) -> State1. terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, fun stop_rate_timer/1, @@ -332,9 +314,10 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), - [emit_consumer_deleted(Ch, CTag, QName) - || {Ch, CTag, _} <- consumers(State1)], + notify_decorators(shutdown, State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _} <- + rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -417,157 +400,19 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{active_consumers = AC}) -> - true = (priority_queue:is_empty(AC) orelse is_empty(State)). +assert_invariant(State = #q{consumers = Consumers}) -> + true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). -lookup_ch(ChPid) -> - case get({ch, ChPid}) of - undefined -> not_found; - C -> C - end. - -ch_record(ChPid, LimiterPid) -> - Key = {ch, ChPid}, - case get(Key) of - undefined -> MonitorRef = erlang:monitor(process, ChPid), - Limiter = rabbit_limiter:client(LimiterPid), - C = #cr{ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = queue:new(), - consumer_count = 0, - blocked_consumers = priority_queue:new(), - limiter = Limiter, - unsent_message_count = 0}, - put(Key, C), - C; - C = #cr{} -> C - end. - -update_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of - {true, 0, 0} -> ok = erase_ch_record(C); - _ -> ok = store_ch_record(C) - end, - C. - -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C), - ok. - -erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), - ok. - -all_ch_record() -> [C || {{ch, _}, C} <- get()]. - -block_consumer(C = #cr{blocked_consumers = Blocked}, - {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> - update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}), - notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). - -is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> - Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). - maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), - [send_drained(C) || C <- all_ch_record()]; + true -> notify_decorators(State), + rabbit_queue_consumers:send_drained(); false -> ok end, State. -send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> - case rabbit_limiter:drained(Limiter) of - {[], Limiter} -> ok; - {CTagCredit, Limiter2} -> rabbit_channel:send_drained( - ChPid, CTagCredit), - update_ch_record(C#cr{limiter = Limiter2}) - end. - -deliver_msgs_to_consumers(_DeliverFun, true, State) -> - {true, State}; -deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers, - consumer_use = CUInfo}) -> - case priority_queue:out_p(ActiveConsumers) of - {empty, _} -> - {false, - State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; - {{value, QEntry, Priority}, Tail} -> - {Stop, State1} = deliver_msg_to_consumer( - DeliverFun, QEntry, Priority, - State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(DeliverFun, Stop, State1) - end. - -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> - C = lookup_ch(ChPid), - case is_ch_blocked(C) of - true -> block_consumer(C, E, State), - {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required, - Consumer#consumer.tag) of - {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E, State), - {false, State}; - {continue, Limiter} -> - AC1 = priority_queue:in(E, Priority, - State#q.active_consumers), - deliver_msg_to_consumer0( - DeliverFun, Consumer, C#cr{limiter = Limiter}, - State#q{active_consumers = AC1}) - end - end. - -deliver_msg_to_consumer0(DeliverFun, - #consumer{tag = ConsumerTag, - ack_required = AckRequired}, - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, Stop, State1} = - DeliverFun(AckRequired, State), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); - false -> ChAckTags - end, - update_ch_record(C#cr{acktags = ChAckTags1, - unsent_message_count = Count + 1}), - {Stop, State1}. - -deliver_from_queue_deliver(AckRequired, State) -> - {Result, State1} = fetch(AckRequired, State), - {Result, is_empty(State1), State1}. - -update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> - CUInfo; -update_consumer_use({active, _, _} = CUInfo, active) -> - CUInfo; -update_consumer_use({active, Since, Avg}, inactive) -> - Now = now_micros(), - {inactive, Now, Now - Since, Avg}; -update_consumer_use({inactive, Since, Active, Avg}, active) -> - Now = now_micros(), - {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. - -consumer_use_avg(Active, Inactive, Avg) -> - Time = Inactive + Active, - Ratio = Active / Time, - Weight = erlang:min(1, Time / 1000000), - case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end. - confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -613,54 +458,68 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - {_IsEmpty1, State1} = deliver_msgs_to_consumers( - fun deliver_from_queue_deliver/2, - is_empty(State), State), - State1. +run_message_queue(State) -> run_message_queue(false, State). -add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> - Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of - {_, P} -> P; - _ -> 0 - end, - priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers). +run_message_queue(ActiveConsumersChanged, State) -> + case is_empty(State) of + true -> maybe_notify_decorators(ActiveConsumersChanged, State); + false -> case rabbit_queue_consumers:deliver( + fun(AckRequired) -> fetch(AckRequired, State) end, + qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged1, State1, Consumers} -> + run_message_queue( + ActiveConsumersChanged or ActiveConsumersChanged1, + State1#q{consumers = Consumers}); + {undelivered, ActiveConsumersChanged1, Consumers} -> + maybe_notify_decorators( + ActiveConsumersChanged or ActiveConsumersChanged1, + State#q{consumers = Consumers}) + end + end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case BQ:is_duplicate(Message, BQS) of - {false, BQS1} -> - deliver_msgs_to_consumers( - fun (true, State1 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}}; - (false, State1) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State1)} - end, false, State#q{backing_queue_state = BQS1}); - {true, BQS1} -> - {true, State#q{backing_queue_state = BQS1}} + case rabbit_queue_consumers:deliver( + fun (true) -> true = BQ:is_empty(BQS), + {AckTag, BQS1} = BQ:publish_delivered( + Message, Props, SenderPid, BQS), + {{Message, Delivered, AckTag}, + State#q{backing_queue_state = BQS1}}; + (false) -> {{Message, Delivered, undefined}, + discard(Delivery, State)} + end, qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, maybe_notify_decorators( + ActiveConsumersChanged, + State1#q{consumers = Consumers})}; + {undelivered, ActiveConsumersChanged, Consumers} -> + {undelivered, maybe_notify_decorators( + ActiveConsumersChanged, + State#q{consumers = Consumers})} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, - Delivered, State) -> + Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), - case attempt_delivery(Delivery, Props, Delivered, State1) of - {true, State2} -> + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State2 = State1#q{backing_queue_state = BQS1}, + case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, + State2) of + true -> State2; + {delivered, State3} -> + State3; %% The next one is an optimisation - {false, State2 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), - QLen = BQ:len(BQS2), + {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> + discard(Delivery, State3); + {undelivered, State3 = #q{backing_queue_state = BQS2}} -> + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + {Dropped, State4 = #q{backing_queue_state = BQS4}} = + maybe_drop_head(State3#q{backing_queue_state = BQS3}), + QLen = BQ:len(BQS4), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -669,9 +528,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State3; - {true, true, undefined} -> State3; - {_, _, _} -> drop_expired_msgs(State3) + {false, false, _} -> State4; + {true, true, undefined} -> State4; + {_, _, _} -> drop_expired_msgs(State4) end end. @@ -721,85 +580,42 @@ requeue(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) - end, Queue). - -remove_consumers(ChPid, Queue, QName) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag, QName), - false; - (_) -> - true - end, Queue). - -possibly_unblock(State, ChPid, Update) -> - case lookup_ch(ChPid) of - not_found -> State; - C -> C1 = Update(C), - case is_ch_blocked(C) andalso not is_ch_blocked(C1) of - false -> update_ch_record(C1), - State; - true -> unblock(State, C1) - end - end. - -unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> - case lists:partition( - fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> - rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, priority_queue:to_list(C#cr.blocked_consumers)) of - {_, []} -> - update_ch_record(C), - State; - {Blocked, Unblocked} -> - BlockedQ = priority_queue:from_list(Blocked), - UnblockedQ = priority_queue:from_list(Unblocked), - update_ch_record(C#cr{blocked_consumers = BlockedQ}), - State1 = State#q{consumer_use = - update_consumer_use(CUInfo, active)}, - AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), - State2 = State1#q{active_consumers = AC1}, - [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State2) || - {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State2) +possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> + case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, +handle_ch_down(DownPid, State = #q{consumers = Consumers, + exclusive_consumer = Holder, senders = Senders}) -> - Senders1 = case pmon:is_monitored(DownPid, Senders) of - false -> Senders; - true -> credit_flow:peer_down(DownPid), - pmon:demonitor(DownPid, Senders) - end, - case lookup_ch(DownPid) of + State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end}, + case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of not_found -> - {ok, State#q{senders = Senders1}}; - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - blocked_consumers = Blocked} -> - QName = qname(State), - _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission - ok = erase_ch_record(C), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers, - QName), - senders = Senders1}, - case should_auto_delete(State1) of - true -> {stop, State1}; - false -> {ok, requeue_and_run(queue:to_list(ChAckTags), - ensure_expiry_timer(State1))} + {ok, State1}; + {ChAckTags, ChCTags, Consumers1} -> + QName = qname(State1), + [emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags], + Holder1 = case Holder of + {DownPid, _} -> none; + Other -> Other + end, + State2 = State1#q{consumers = Consumers1, + exclusive_consumer = Holder1}, + notify_decorators(State2), + case should_auto_delete(State2) of + true -> {stop, State2}; + false -> {ok, requeue_and_run(ChAckTags, + ensure_expiry_timer(State2))} end end. @@ -813,10 +629,7 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -consumer_count() -> - lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). - -is_unused(_State) -> consumer_count() == 0. +is_unused(_State) -> rabbit_queue_consumers:count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -828,23 +641,9 @@ backing_queue_timeout(State = #q{backing_queue = BQ, State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> - case lookup_ch(ChPid) of - not_found -> - State; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - Fun(State) - end. - -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> - case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of + not_found -> State; + ok -> Fun(State) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -1065,21 +864,16 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]); + rabbit_queue_consumers:unacknowledged_message_count(); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); i(consumers, _) -> - consumer_count(); -i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> - case consumer_count() of + rabbit_queue_consumers:count(); +i(consumer_utilisation, #q{consumers = Consumers}) -> + case rabbit_queue_consumers:count() of 0 -> ''; - _ -> case ConsumerUse of - {active, Since, Avg} -> - consumer_use_avg(now_micros() - Since, 0, Avg); - {inactive, Since, Active, Avg} -> - consumer_use_avg(Active, now_micros() - Since, Avg) - end + _ -> rabbit_queue_consumers:utilisation(Consumers) end; i(memory, _) -> {memory, M} = process_info(self(), memory), @@ -1105,17 +899,6 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers}) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(ActiveConsumers, []), all_ch_record()). - -consumers(Consumers, Acc) -> - priority_queue:fold( - fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] - end, Acc, Consumers). - emit_stats(State) -> emit_stats(State, []). @@ -1204,8 +987,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, State) -> - reply(consumers(State), State); +handle_call(consumers, _From, State = #q{consumers = Consumers}) -> + reply(rabbit_queue_consumers:all(Consumers), State); handle_call({deliver, Delivery, Delivered}, From, State) -> %% Synchronous, "mandatory" deliver mode. @@ -1230,91 +1013,58 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag}, State2} -> - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = - ch_record(ChPid, LimiterPid), - ChAckTags1 = queue:in(AckTag, ChAckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), - State2; - false -> State2 - end, + {{Message, IsDelivered, AckTag}, + #q{backing_queue = BQ, backing_queue_state = BQS} = State2} -> + case AckRequired of + true -> ok = rabbit_queue_consumers:record_ack( + ChPid, LimiterPid, AckTag); + false -> ok + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, BQ:len(BQS), Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + _From, State = #q{consumers = Consumers, + exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = Count, - limiter = Limiter} = ch_record(ChPid, LimiterPid), - Limiter1 = case LimiterActive of - true -> rabbit_limiter:activate(Limiter); - false -> Limiter - end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) - end, - C1 = update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter2}), - case is_empty(State) of - true -> send_drained(C1); - false -> ok - end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck, - args = OtherArgs}, - ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), OtherArgs), - AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), - State2 = State1#q{active_consumers = AC1}, - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State2), - reply(ok, run_message_queue(State2)) + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + CreditArgs, OtherArgs, + is_empty(State), Consumers), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck, qname(State1), OtherArgs), + notify_decorators(State1), + reply(ok, run_message_queue(State1)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), - case lookup_ch(ChPid) of + case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); - C = #cr{consumer_count = Count, - limiter = Limiter, - blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - Limiter1 = case Count of - 1 -> rabbit_limiter:deactivate(Limiter); - _ -> Limiter - end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), - update_ch_record(C#cr{consumer_count = Count - 1, - limiter = Limiter2, - blocked_consumers = Blocked1}), - State1 = State#q{ - exclusive_consumer = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers)}, - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + Consumers1 -> + Holder1 = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + State1 = State#q{consumers = Consumers1, + exclusive_consumer = Holder1}, + emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1324,7 +1074,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_expiry_timer(State), - reply({ok, BQ:len(BQS), consumer_count()}, State1); + reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1376,14 +1126,16 @@ handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State); handle_call(force_event_refresh, _From, - State = #q{exclusive_consumer = Exclusive}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), QName = qname(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( Ch, CTag, false, AckRequired, QName, Args) || - {Ch, CTag, AckRequired, Args} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State), + {Ch, CTag, AckRequired, Args} <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Args) end, @@ -1424,25 +1176,16 @@ handle_cast(delete_immediately, State) -> stop(State); handle_cast({resume, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:resume(Limiter)} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:resume_fun(), + ChPid, State)); handle_cast({notify_sent, ChPid, Credit}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - Credit} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:notify_sent_fun(Credit), + ChPid, State)); handle_cast({activate_limit, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C = #cr{limiter = Limiter}) -> - C#cr{limiter = rabbit_limiter:activate(Limiter)} - end)); + noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), + ChPid, State)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), @@ -1458,42 +1201,33 @@ handle_cast({set_maximum_since_use, Age}, State) -> noreply(State); handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq {ok, Q} = rabbit_amqqueue:lookup(qname(State)), true = BQ =/= rabbit_mirror_queue_master, %% assertion BQ1 = rabbit_mirror_queue_master, BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> BQ = rabbit_mirror_queue_master, %% assertion {BQ1, BQS1} = BQ:stop_mirroring(BQS), noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); + backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - C = #cr{limiter = Limiter} = lookup_ch(ChPid), - C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, - noreply(case Drain andalso Len == 0 of - true -> update_ch_record(C1), - send_drained(C1), - State; - false -> case is_ch_blocked(C1) of - true -> update_ch_record(C1), - State; - false -> unblock(State, C1) - end - end); + noreply(possibly_unblock(rabbit_queue_consumers:credit_fun( + Len == 0, Credit, Drain, CTag), + ChPid, State)); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index ae5bbf51..83f68ed3 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) -> table_field_to_binary({FName, T, V}) -> [short_string_to_binary(FName) | field_value_to_binary(T, V)]. -field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)]; -field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>]; +field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)]; +field_value_to_binary(signedint, V) -> [$I, <<V:32/signed>>]; field_value_to_binary(decimal, V) -> {Before, After} = V, - ["D", Before, <<After:32>>]; -field_value_to_binary(timestamp, V) -> ["T", <<V:64>>]; -field_value_to_binary(table, V) -> ["F", table_to_binary(V)]; -field_value_to_binary(array, V) -> ["A", array_to_binary(V)]; -field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>]; -field_value_to_binary(double, V) -> ["d", <<V:64/float>>]; -field_value_to_binary(float, V) -> ["f", <<V:32/float>>]; -field_value_to_binary(long, V) -> ["l", <<V:64/signed>>]; -field_value_to_binary(short, V) -> ["s", <<V:16/signed>>]; -field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end]; -field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)]; -field_value_to_binary(void, _V) -> ["V"]. + [$D, Before, <<After:32>>]; +field_value_to_binary(timestamp, V) -> [$T, <<V:64>>]; +field_value_to_binary(table, V) -> [$F | table_to_binary(V)]; +field_value_to_binary(array, V) -> [$A | array_to_binary(V)]; +field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>]; +field_value_to_binary(double, V) -> [$d, <<V:64/float>>]; +field_value_to_binary(float, V) -> [$f, <<V:32/float>>]; +field_value_to_binary(long, V) -> [$l, <<V:64/signed>>]; +field_value_to_binary(short, V) -> [$s, <<V:16/signed>>]; +field_value_to_binary(bool, V) -> [$t, if V -> 1; true -> 0 end]; +field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)]; +field_value_to_binary(void, _V) -> [$V]. table_to_binary(Table) when is_list(Table) -> - BinTable = generate_table(Table), - [<<(size(BinTable)):32>>, BinTable]. + BinTable = generate_table_iolist(Table), + [<<(iolist_size(BinTable)):32>> | BinTable]. array_to_binary(Array) when is_list(Array) -> - BinArray = generate_array(Array), - [<<(size(BinArray)):32>>, BinArray]. + BinArray = generate_array_iolist(Array), + [<<(iolist_size(BinArray)):32>> | BinArray]. generate_table(Table) when is_list(Table) -> - list_to_binary(lists:map(fun table_field_to_binary/1, Table)). + list_to_binary(generate_table_iolist(Table)). -generate_array(Array) when is_list(Array) -> - list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, - Array)). +generate_table_iolist(Table) -> + lists:map(fun table_field_to_binary/1, Table). + +generate_array_iolist(Array) -> + lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array). -short_string_to_binary(String) when is_binary(String) -> - Len = size(String), - if Len < 256 -> [<<Len:8>>, String]; - true -> exit(content_properties_shortstr_overflow) - end; short_string_to_binary(String) -> - Len = length(String), + Len = string_length(String), if Len < 256 -> [<<Len:8>>, String]; true -> exit(content_properties_shortstr_overflow) end. -long_string_to_binary(String) when is_binary(String) -> - [<<(size(String)):32>>, String]; long_string_to_binary(String) -> - [<<(length(String)):32>>, String]. + Len = string_length(String), + [<<Len:32>>, String]. + +string_length(String) when is_binary(String) -> size(String); +string_length(String) -> length(String). check_empty_frame_size() -> %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 088ad0e5..f65d8ea7 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -53,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) -> {Type, Value, Rest} = parse_field_value(ValueAndRest), [{Type, Value} | parse_array(Rest)]. -parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {longstr, V, R}; -parse_field_value(<<"I", V:32/signed, R/binary>>) -> +parse_field_value(<<$I, V:32/signed, R/binary>>) -> {signedint, V, R}; -parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) -> +parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> {decimal, {Before, After}, R}; -parse_field_value(<<"T", V:64/unsigned, R/binary>>) -> +parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> {timestamp, V, R}; -parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> +parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> {table, parse_table(Table), R}; -parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> +parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> {array, parse_array(Array), R}; -parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R}; -parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R}; +parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; +parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; +parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; +parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; +parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; -parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {binary, V, R}; -parse_field_value(<<"V", R/binary>>) -> +parse_field_value(<<$V, R/binary>>) -> {void, undefined, R}. ensure_content_decoded(Content = #content{properties = Props}) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d778f94..eded8a90 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -694,12 +694,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, - requeue = Requeue}, - _, State) -> + requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, Multiple, State); handle_method(#'basic.ack'{delivery_tag = DeliveryTag, - multiple = Multiple}, + multiple = Multiple}, _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, @@ -710,8 +709,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1#ch{tx = {Msgs, Acks1}} end}; -handle_method(#'basic.get'{queue = QueueNameBin, - no_ack = NoAck}, +handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, limiter = Limiter, @@ -749,9 +747,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -801,8 +799,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) end; -handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, - nowait = NoWait}, +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, @@ -849,13 +846,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, - State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, @@ -864,8 +861,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> + _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), foreach_per_queue( @@ -887,19 +883,18 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> Content, State), {reply, #'basic.recover_ok'{}, State1}; -handle_method(#'basic.reject'{delivery_tag = DeliveryTag, - requeue = Requeue}, +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, false, State); -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, - passive = false, - durable = Durable, +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, auto_delete = AutoDelete, - internal = Internal, - nowait = NoWait, - arguments = Args}, + internal = Internal, + nowait = NoWait, + arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -932,17 +927,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - passive = true, - nowait = NoWait}, + passive = true, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{exchange = ExchangeNameBin, +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, - nowait = NoWait}, + nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), @@ -958,19 +953,19 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, end; handle_method(#'exchange.bind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.bind_ok'{}, NoWait, State); handle_method(#'exchange.unbind'{destination = DestinationNameBin, - source = SourceNameBin, + source = SourceNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); @@ -1062,10 +1057,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); -handle_method(#'queue.delete'{queue = QueueNameBin, +handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, - if_empty = IfEmpty, - nowait = NoWait}, + if_empty = IfEmpty, + nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), @@ -1087,25 +1082,24 @@ handle_method(#'queue.delete'{queue = QueueNameBin, #'queue.delete_ok'{message_count = PurgedMessageCount}) end; -handle_method(#'queue.bind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> + nowait = NoWait, + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); -handle_method(#'queue.unbind'{queue = QueueNameBin, - exchange = ExchangeNameBin, +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, routing_key = RoutingKey, - arguments = Arguments}, _, State) -> + arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); -handle_method(#'queue.purge'{queue = QueueNameBin, - nowait = NoWait}, +handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), @@ -1153,15 +1147,15 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter = Limiter}) -> +handle_method(#'channel.flow'{active = true}, + _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unblock(Limiter), {reply, #'channel.flow_ok'{active = true}, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; -handle_method(#'channel.flow'{active = false}, _, - State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> +handle_method(#'channel.flow'{active = false}, + _, State = #ch{consumer_mapping = Consumers, + limiter = Limiter}) -> case rabbit_limiter:is_blocked(Limiter) of true -> {noreply, maybe_send_flow_ok(State)}; false -> Limiter1 = rabbit_limiter:block(Limiter), @@ -1186,8 +1180,8 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, - drain = Drain}, _, - State = #ch{consumer_mapping = Consumers}) -> + drain = Drain}, + _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:credit( Q, self(), CTag, Credit, Drain), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 22da465b..2857ca55 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -126,7 +126,7 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -168,8 +168,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) - -> qstate()). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), + boolean()) -> qstate()). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -245,9 +245,9 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, case is_consumer_blocked(L, CTag) of false -> case (State =/= active orelse safe_call(Pid, {can_send, self(), AckRequired}, true)) of - true -> {continue, L#qstate{ - credits = record_send_q(CTag, Credits)}}; - false -> {suspend, L#qstate{state = suspended}} + true -> Credits1 = decrement_credit(CTag, Credits), + {continue, L#qstate{credits = Credits1}}; + false -> {suspend, L#qstate{state = suspended}} end; true -> {suspend, L} end. @@ -271,12 +271,14 @@ is_suspended(#qstate{}) -> false. is_consumer_blocked(#qstate{credits = Credits}, CTag) -> case gb_trees:lookup(CTag, Credits) of + none -> false; {value, #credit{credit = C}} when C > 0 -> false; - {value, #credit{}} -> true; - none -> false + {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> +credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> + Limiter#qstate{credits = update_credit(CTag, 0, true, Credits)}; +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. drained(Limiter = #qstate{credits = Credits}) -> @@ -303,7 +305,7 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -record_send_q(CTag, Credits) -> +decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of {value, #credit{credit = Credit, drain = Drain}} -> update_credit(CTag, Credit - 1, Drain, Credits); diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b54fdd2e..5a1613a7 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,9 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2]). + is_running/2, is_process_running/2, fqdn_nodename/0]). + +-include_lib("kernel/include/inet.hrl"). -define(EPMD_TIMEOUT, 30000). @@ -35,6 +37,7 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). +-spec(fqdn_nodename/0 :: () -> binary()). -endif. @@ -107,3 +110,9 @@ is_process_running(Node, Process) -> undefined -> false; P when is_pid(P) -> true end. + +fqdn_nodename() -> + {ID, _} = rabbit_nodes:parts(node()), + {ok, Host} = inet:gethostname(), + {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), + list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl new file mode 100644 index 00000000..f06423f7 --- /dev/null +++ b/src/rabbit_queue_consumers.erl @@ -0,0 +1,422 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_queue_consumers). + +-export([new/0, max_active_priority/1, inactive/1, all/1, count/0, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + possibly_unblock/3, + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, + utilisation/1]). + +%%---------------------------------------------------------------------------- + +-define(UNSENT_MESSAGE_LIMIT, 200). + +-record(state, {consumers, use}). + +-record(consumer, {tag, ack_required, args}). + +%% These are held in our process dictionary +-record(cr, {ch_pid, + monitor_ref, + acktags, + consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason + blocked_consumers, + %% The limiter itself + limiter, + %% Internal flow control for queue -> writer + unsent_message_count}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type time_micros() :: non_neg_integer(). +-type ratio() :: float(). +-type state() :: #state{consumers ::priority_queue:q(), + use :: {'inactive', + time_micros(), time_micros(), ratio()} | + {'active', time_micros(), ratio()}}. +-type ch() :: pid(). +-type ack() :: non_neg_integer(). +-type cr_fun() :: fun ((#cr{}) -> #cr{}). +-type credit_args() :: {non_neg_integer(), boolean()} | 'none'. +-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. + +-spec new() -> state(). +-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. +-spec inactive(state()) -> boolean(). +-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]. +-spec count() -> non_neg_integer(). +-spec unacknowledged_message_count() -> non_neg_integer(). +-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), + credit_args(), rabbit_framing:amqp_table(), boolean(), + state()) -> state(). +-spec remove(ch(), rabbit_types:ctag(), state()) -> + 'not_found' | state(). +-spec erase_ch(ch(), state()) -> + 'not_found' | {[ack()], [rabbit_types:ctag()], + state()}. +-spec send_drained() -> 'ok'. +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state()) -> + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. +-spec record_ack(ch(), pid(), ack()) -> 'ok'. +-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec possibly_unblock(cr_fun(), ch(), state()) -> + 'unchanged' | {'unblocked', state()}. +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit_fun(boolean(), non_neg_integer(), boolean(), + rabbit_types:ctag()) -> cr_fun(). +-spec utilisation(state()) -> ratio(). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> #state{consumers = priority_queue:new(), + use = {inactive, now_micros(), 0, 0.0}}. + +max_active_priority(#state{consumers = Consumers}) -> + priority_queue:highest(Consumers). + +inactive(#state{consumers = Consumers}) -> + priority_queue:is_empty(Consumers). + +all(#state{consumers = Consumers}) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(Consumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> + priority_queue:fold( + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] + end, Acc, Consumers). + +count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). + +unacknowledged_message_count() -> + lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + +add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, + IsEmpty, State = #state{consumers = Consumers}) -> + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, IsEmpty, Drain) + end, + C1 = C#cr{consumer_count = Count + 1, + limiter = Limiter2}, + update_ch_record(case IsEmpty of + true -> send_drained(C1); + false -> C1 + end), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck, + args = OtherArgs}, + State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. + +remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> + Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), + State#state{consumers = + remove_consumer(ChPid, ConsumerTag, Consumers)} + end. + +erase_ch(ChPid, State = #state{consumers = Consumers}) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = BlockedQ} -> + AllConsumers = priority_queue:join(Consumers, BlockedQ), + ok = erase_ch_record(C), + {queue:to_list(ChAckTags), + tags(priority_queue:to_list(AllConsumers)), + State#state{consumers = remove_consumers(ChPid, Consumers)}} + end. + +send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], + ok. + +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). + +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> + case priority_queue:out_p(Consumers) of + {empty, _} -> + {undelivered, ConsumersChanged, + State#state{use = update_use(State#state.use, inactive)}}; + {{value, QEntry, Priority}, Tail} -> + case deliver_to_consumer(FetchFun, QEntry, QName) of + {delivered, R} -> + {delivered, ConsumersChanged, R, + State#state{consumers = priority_queue:in(QEntry, Priority, + Tail)}}; + undelivered -> + deliver(FetchFun, QName, true, + State#state{consumers = Tail}) + end + end. + +deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> + C = lookup_ch(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + undelivered; + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + undelivered; + {continue, Limiter} -> + {delivered, deliver_to_consumer( + FetchFun, Consumer, + C#cr{limiter = Limiter}, QName)} + end + end. + +deliver_to_consumer(FetchFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + QName) -> + {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> queue:in(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + R. + +record_ack(ChPid, LimiterPid, AckTag) -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), + update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + ok. + +subtract_acks(ChPid, AckTags) -> + case lookup_ch(ChPid) of + not_found -> + not_found; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), + ok + end. + +subtract_acks([], [], AckQ) -> + AckQ; +subtract_acks([], Prefix, AckQ) -> + queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); +subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> + case queue:out(AckQ) of + {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + end. + +possibly_unblock(Update, ChPid, State) -> + case lookup_ch(ChPid) of + not_found -> unchanged; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + unchanged; + true -> unblock(C1, State) + end + end. + +unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, + State = #state{consumers = Consumers, use = Use}) -> + case lists:partition( + fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, priority_queue:to_list(BlockedQ)) of + {_, []} -> + update_ch_record(C), + unchanged; + {Blocked, Unblocked} -> + BlockedQ1 = priority_queue:from_list(Blocked), + UnblockedQ = priority_queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ1}), + {unblocked, + State#state{consumers = priority_queue:join(Consumers, UnblockedQ), + use = update_use(Use, active)}} + end. + +resume_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end. + +notify_sent_fun(Credit) -> + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - Credit} + end. + +activate_limit_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end. + +credit_fun(IsEmpty, Credit, Drain, CTag) -> + fun (C = #cr{limiter = Limiter}) -> + C1 = C#cr{limiter = rabbit_limiter:credit( + Limiter, CTag, Credit, IsEmpty, Drain)}, + case Drain andalso IsEmpty of + true -> send_drained(C1); + false -> C1 + end + end. + +utilisation(#state{use = {active, Since, Avg}}) -> + use_avg(now_micros() - Since, 0, Avg); +utilisation(#state{use = {inactive, Since, Active, Avg}}) -> + use_avg(Active, now_micros() - Since, Avg). + +%%---------------------------------------------------------------------------- + +lookup_ch(ChPid) -> + case get({ch, ChPid}) of + undefined -> not_found; + C -> C + end. + +ch_record(ChPid, LimiterPid) -> + Key = {ch, ChPid}, + case get(Key) of + undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), + C = #cr{ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = queue:new(), + consumer_count = 0, + blocked_consumers = priority_queue:new(), + limiter = Limiter, + unsent_message_count = 0}, + put(Key, C), + C; + C = #cr{} -> C + end. + +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> + case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + {true, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) + end, + C. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. + +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + +all_ch_record() -> [C || {{ch, _}, C} <- get()]. + +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}). + +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> C; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + C#cr{limiter = Limiter2} + end. + +tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList]. + +add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> + Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, P} -> P; + _ -> 0 + end, + priority_queue:in({ChPid, Consumer}, Priority, Queue). + +remove_consumer(ChPid, ConsumerTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) + end, Queue). + +remove_consumers(ChPid, Queue) -> + priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; + (_) -> true + end, Queue). + +update_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, use_avg(Active, Now - Since, Avg)}. + +use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + +now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..6205e2dc 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,9 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +%% called with Queue, MaxActivePriority, IsEmpty +-callback consumer_state_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. -else. @@ -32,7 +27,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {consumer_state_changed, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index da22e092..d9879f1b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2, recvloop/2]). +-export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -38,8 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count, - throttle}). + stats_timer, channel_sup_sup_pid, channel_count, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, @@ -92,9 +91,10 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). --spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). --spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> + any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. @@ -114,8 +114,8 @@ init(Parent, HelperSup) -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - mainloop(Deb, State#v1{parent = Parent}). +system_continue(Parent, Deb, {Buf, BufLen, State}) -> + mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -239,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - buf = [], - buf_len = 0, channel_count = 0, throttle = #throttle{ alarmed_by = [], @@ -249,9 +247,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> blocked_sent = false}}, try run({?MODULE, recvloop, - [Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)]}), + [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -278,29 +276,38 @@ run({M, F, A}) -> catch {become, MFA} -> run(MFA) end. -recvloop(Deb, State = #v1{pending_recv = true}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = blocked}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) +recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, Buf, BufLen, State)}); +recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> - {Data, Rest} = split_binary(case Buf of - [B] -> B; - _ -> list_to_binary(lists:reverse(Buf)) - end, RecvLen), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = [Rest], - buf_len = BufLen - RecvLen})). - -mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> + mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); +recvloop(Deb, [B], _BufLen, State) -> + {Rest, State1} = handle_input(State#v1.callback, B, State), + recvloop(Deb, [Rest], size(Rest), State1); +recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> + {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), + Data = list_to_binary(lists:reverse(DataLRev)), + {<<>>, State1} = handle_input(State#v1.callback, Data, State), + recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). + +binlist_split(0, L, Acc) -> + {L, Acc}; +binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> + {H, T} = split_binary(Acc0, -Len), + {[H|L], [T|Acc]}; +binlist_split(Len, [H|T], Acc) -> + binlist_split(Len - size(H), T, [H|Acc]). + +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + recvloop(Deb, [Data | Buf], BufLen + size(Data), + State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -311,11 +318,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> throw({inet_error, Reason}); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, - ?MODULE, Deb, State); + ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; - NewState -> recvloop(Deb, NewState) + NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. @@ -715,26 +722,38 @@ post_process_frame(_Frame, _ChPid, State) -> %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - ensure_stats_timer( - switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1)); - +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, + Payload:PayloadSize/binary, ?FRAME_END, + Rest/binary>>, + State) -> + {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(State, + {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> - <<Payload:PayloadSize/binary, EndMarker>> = Data, + <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), - switch_callback(State1, frame_header, 7); + {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; +handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> + {Rest, handshake({A, B, C, D}, State)}; +handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_header, Other}); +handle_input(Callback, Data, _State) -> + throw({bad_input, Callback, Data}). %% The two rules pertaining to version negotiation: %% @@ -744,37 +763,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> %% %% * The server MUST provide a protocol version that is lower than or %% equal to that requested by the client in the protocol header. -handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> +handshake({0, 0, 9, 1}, State) -> start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); %% This is the protocol header for 0-9, which we can safely treat as %% though it were 0-9-1. -handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> +handshake({1, 1, 0, 9}, State) -> start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); %% This is what most clients send for 0-8. The 0-8 spec, confusingly, %% defines the version as 8-0. -handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> +handshake({1, 1, 8, 0}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); %% The 0-8 spec as on the AMQP web site actually has this as the %% protocol header; some libraries e.g., py-amqplib, send it when they %% want 0-8. -handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> +handshake({1, 1, 9, 1}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); -%% ... and finally, the 1.0 spec is crystal clear! Note that the -handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) -> +%% ... and finally, the 1.0 spec is crystal clear! +handshake({Id, 1, 0, 0}, State) -> become_1_0(Id, State); -handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, {A, B, C, D}}); - -handle_input(handshake, Other, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_header, Other}); - -handle_input(Callback, Data, _State) -> - throw({bad_input, Callback, Data}). +handshake(Vsn, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, Vsn}). %% Offer a protocol version to the client. Connection.start only %% includes a major and minor version number, Luckily 0-9 and 0-9-1 @@ -818,7 +831,10 @@ handle_method0(MethodName, FieldsBin, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) - catch exit:#amqp_error{method = none} = Reason -> + catch throw:{inet_error, closed} -> + maybe_emit_stats(State), + throw(connection_closed_abruptly); + exit:#amqp_error{method = none} = Reason -> handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> Stack = erlang:get_stacktrace(), @@ -1123,15 +1139,16 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - throw({become, {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(State)]}}) + F = fun (_Deb, Buf, BufLen, S) -> + {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(Buf, BufLen, S)]} + end, + State = #v1{connection_state = {become, F}} end. -pack_for_1_0(#v1{parent = Parent, - sock = Sock, - recv_len = RecvLen, - pending_recv = PendingRecv, - helper_sup = SupPid, - buf = Buf, - buf_len = BufLen}) -> +pack_for_1_0(Buf, BufLen, #v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 369ec655..fc4353dc 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -173,16 +173,19 @@ set_mem_limits(State, MemFraction) -> ?MEMORY_SIZE_FOR_UNKNOWN_OS; M -> M end, - UsableMemory = case get_vm_limit() of - Limit when Limit < TotalMemory -> - error_logger:warning_msg( - "Only ~pMB of ~pMB memory usable due to " - "limited address space.~n", - [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]), - Limit; - _ -> - TotalMemory - end, + UsableMemory = + case get_vm_limit() of + Limit when Limit < TotalMemory -> + error_logger:warning_msg( + "Only ~pMB of ~pMB memory usable due to " + "limited address space.~n" + "Crashes due to memory exhaustion are possible - see~n" + "http://www.rabbitmq.com/memory.html#address-space~n", + [trunc(V/?ONE_MB) || V <- [Limit, TotalMemory]]), + Limit; + _ -> + TotalMemory + end, MemLim = trunc(MemFraction * UsableMemory), error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), |