diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-28 18:08:36 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-07-28 18:08:36 +0100 |
commit | ab6b089c195662804fd9dce6f78c4dbc3c25ee91 (patch) | |
tree | 40221955ef4e4340aa79faa03cd48699eb85d684 | |
parent | 4fa61208e945b600b9e81877c3410dead6cf326f (diff) | |
download | rabbitmq-server-ab6b089c195662804fd9dce6f78c4dbc3c25ee91.tar.gz |
employ more sophisticated message queue formatting
...in the channel, queue and msg_store
-rw-r--r-- | src/priority_queue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 23 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 5 |
5 files changed, 35 insertions, 17 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 34787903..4fc8b469 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -47,6 +47,9 @@ -ifdef(use_specs). +-export_type([q/0]). + +-type(q() :: pqueue()). -type(priority() :: integer() | 'infinity'). -type(squeue() :: {queue, [any()], [any()]}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4492bbd8..c6019413 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -31,12 +31,10 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -export([init_with_backing_queue_state/7]). --export([format_message_queue/2]). - %% Queue's state -record(q, {q, exclusive_consumer, @@ -1165,15 +1163,4 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. -format_message_queue(_Opt, Mailbox) -> - Len = priority_queue:len(Mailbox), - {Len, - case Len > 100 of - false -> priority_queue:to_list(Mailbox); - true -> {summary, - orddict:to_list( - lists:foldl( - fun ({P, _V}, Counts) -> - orddict:update_counter(P, 1, Counts) - end, orddict:new(), priority_queue:to_list(Mailbox)))} - end}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f398fcc5..13fb7ce1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -27,7 +27,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, format_message_queue/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter_pid, start_limiter_fun, tx_status, next_tag, @@ -344,6 +344,8 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> reply(Reply, [], NewState). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b6b97f6d..3bbfb1d7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -57,6 +57,7 @@ -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). +-export([format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -205,6 +206,7 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -endif. @@ -919,3 +921,24 @@ pget_or_die(K, P) -> undefined -> exit({error, key_missing, K}); V -> V end. + +format_message_queue(_Opt, MQ) -> + Len = priority_queue:len(MQ), + {Len, + case Len > 100 of + false -> priority_queue:to_list(MQ); + true -> {summary, + orddict:to_list( + lists:foldl( + fun ({P, V}, Counts) -> + orddict:update_counter( + {P, format_message_queue_entry(V)}, 1, Counts) + end, orddict:new(), priority_queue:to_list(MQ)))} + end}. + +format_message_queue_entry(V) when is_atom(V) -> + V; +format_message_queue_entry(V) when is_tuple(V) -> + list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); +format_message_queue_entry(_V) -> + '_'. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3f4162cd..27de1f77 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -29,7 +29,8 @@ -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2, + format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -836,6 +837,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- |