summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-24 14:05:33 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-24 14:05:33 +0100
commita6cc463997862e19b81582e51b1a44c243dfbb0c (patch)
tree297ce718cf7ff68ea964db443c30c677a69c2e65
parent064d8ee7b5fd14dcc8fa21583dd0f8f39cca8fe5 (diff)
downloadrabbitmq-server-a6cc463997862e19b81582e51b1a44c243dfbb0c.tar.gz
Don't emit random bits of the queue's internal data structures in events.
-rw-r--r--src/priority_queue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl10
2 files changed, 8 insertions, 6 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 8ded389b..c92b6564 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -66,7 +66,7 @@
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
-spec(fold/3 :: (fun ((any(), any()) -> any()), any(), pqueue()) -> any()).
--spec(highest/1 :: (pqueue()) -> priority()).
+-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
@@ -212,7 +212,7 @@ fold(Fun, Init, Q) -> case out_p(Q) of
{{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
end.
-highest({queue, [], [], 0}) -> exit(highest_priority_of_empty_queue);
+highest({queue, [], [], 0}) -> empty;
highest({queue, _, _, _}) -> 0;
highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6624b5d2..136168f6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -236,8 +236,9 @@ notify_decorators(Event, Props, State = #q{active_consumers = ACs,
backing_queue = BQ,
backing_queue_state = BQS}) ->
callback(qname(State), notify,
- [Event, [{active_consumers, ACs},
- {is_empty, BQ:is_empty(BQS)} | Props]]).
+ [Event,
+ [{max_active_consumer_priority, priority_queue:highest(ACs)},
+ {is_empty, BQ:is_empty(BQS)} | Props]]).
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -423,10 +424,11 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
-block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry, State) ->
+block_consumer(C = #cr{blocked_consumers = Blocked},
+ {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked),
update_ch_record(C#cr{blocked_consumers = Blocked1}),
- notify_decorators(consumer_blocked, [{q_entry, QEntry}], State).
+ notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).