summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-07-28 18:08:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-07-28 18:08:36 +0100
commitab6b089c195662804fd9dce6f78c4dbc3c25ee91 (patch)
tree40221955ef4e4340aa79faa03cd48699eb85d684
parent4fa61208e945b600b9e81877c3410dead6cf326f (diff)
downloadrabbitmq-server-ab6b089c195662804fd9dce6f78c4dbc3c25ee91.tar.gz
employ more sophisticated message queue formatting
...in the channel, queue and msg_store
-rw-r--r--src/priority_queue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_misc.erl23
-rw-r--r--src/rabbit_msg_store.erl5
5 files changed, 35 insertions, 17 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 34787903..4fc8b469 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -47,6 +47,9 @@
-ifdef(use_specs).
+-export_type([q/0]).
+
+-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()]}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4492bbd8..c6019413 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -31,12 +31,10 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([init_with_backing_queue_state/7]).
--export([format_message_queue/2]).
-
%% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -1165,15 +1163,4 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.
-format_message_queue(_Opt, Mailbox) ->
- Len = priority_queue:len(Mailbox),
- {Len,
- case Len > 100 of
- false -> priority_queue:to_list(Mailbox);
- true -> {summary,
- orddict:to_list(
- lists:foldl(
- fun ({P, _V}, Counts) ->
- orddict:update_counter(P, 1, Counts)
- end, orddict:new(), priority_queue:to_list(Mailbox)))}
- end}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f398fcc5..13fb7ce1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -27,7 +27,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, format_message_queue/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter_pid, start_limiter_fun, tx_status, next_tag,
@@ -344,6 +344,8 @@ terminate(Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> reply(Reply, [], NewState).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b6b97f6d..3bbfb1d7 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -57,6 +57,7 @@
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
-export([pget/2, pget/3, pget_or_die/2]).
+-export([format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -205,6 +206,7 @@
-spec(pget/2 :: (term(), [term()]) -> term()).
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
+-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-endif.
@@ -919,3 +921,24 @@ pget_or_die(K, P) ->
undefined -> exit({error, key_missing, K});
V -> V
end.
+
+format_message_queue(_Opt, MQ) ->
+ Len = priority_queue:len(MQ),
+ {Len,
+ case Len > 100 of
+ false -> priority_queue:to_list(MQ);
+ true -> {summary,
+ orddict:to_list(
+ lists:foldl(
+ fun ({P, V}, Counts) ->
+ orddict:update_counter(
+ {P, format_message_queue_entry(V)}, 1, Counts)
+ end, orddict:new(), priority_queue:to_list(MQ)))}
+ end}.
+
+format_message_queue_entry(V) when is_atom(V) ->
+ V;
+format_message_queue_entry(V) when is_tuple(V) ->
+ list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]);
+format_message_queue_entry(_V) ->
+ '_'.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 3f4162cd..27de1f77 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -29,7 +29,8 @@
-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
+ terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2,
+ format_message_queue/2]).
%%----------------------------------------------------------------------------
@@ -836,6 +837,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------