From a6cc463997862e19b81582e51b1a44c243dfbb0c Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 24 Jul 2013 14:05:33 +0100 Subject: Don't emit random bits of the queue's internal data structures in events. --- src/priority_queue.erl | 4 ++-- src/rabbit_amqqueue_process.erl | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 8ded389b..c92b6564 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -66,7 +66,7 @@ -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). -spec(fold/3 :: (fun ((any(), any()) -> any()), any(), pqueue()) -> any()). --spec(highest/1 :: (pqueue()) -> priority()). +-spec(highest/1 :: (pqueue()) -> priority() | 'empty'). -endif. @@ -212,7 +212,7 @@ fold(Fun, Init, Q) -> case out_p(Q) of {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1) end. -highest({queue, [], [], 0}) -> exit(highest_priority_of_empty_queue); +highest({queue, [], [], 0}) -> empty; highest({queue, _, _, _}) -> 0; highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6624b5d2..136168f6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -236,8 +236,9 @@ notify_decorators(Event, Props, State = #q{active_consumers = ACs, backing_queue = BQ, backing_queue_state = BQS}) -> callback(qname(State), notify, - [Event, [{active_consumers, ACs}, - {is_empty, BQ:is_empty(BQS)} | Props]]). + [Event, + [{max_active_consumer_priority, priority_queue:highest(ACs)}, + {is_empty, BQ:is_empty(BQS)} | Props]]). bq_init(BQ, Q, Recover) -> Self = self(), @@ -423,10 +424,11 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry, State) -> +block_consumer(C = #cr{blocked_consumers = Blocked}, + {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked), update_ch_record(C#cr{blocked_consumers = Blocked1}), - notify_decorators(consumer_blocked, [{q_entry, QEntry}], State). + notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). -- cgit v1.2.1