summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-10-30 13:20:09 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-10-30 13:20:09 +0000
commit5f6bdb602b6be4ee69dccba9e87b55621a981686 (patch)
tree3f75b3ab8b14ca00954f2765e9ac75d09746df6e
parent3a8414a766f31afe217996488a9efc36754bf0b4 (diff)
downloadrabbitmq-server-bug25858.tar.gz
Track payload size of messages (ignoring sharing)bug25858
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl16
4 files changed, 22 insertions, 4 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index bd7a0eed..c91f3ae5 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -67,7 +67,7 @@
-record(runtime_parameters, {key, value}).
-record(basic_message, {exchange_name, routing_keys = [], content, id,
- is_persistent}).
+ is_persistent, payload_size}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, sender, message, msg_seq_no}).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536..aef316d3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -149,13 +149,16 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
-message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
+message(XName, RoutingKey,
+ #content{properties = Props,
+ payload_fragments_rev = PFR} = DecodedContent) ->
try
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
+ payload_size = iolist_size(PFR),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
catch
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..23e5e988 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -65,7 +65,8 @@
routing_keys :: [rabbit_router:routing_key()],
content :: content(),
id :: msg_id(),
- is_persistent :: boolean()}).
+ is_persistent :: boolean(),
+ payload_size :: pos_integer()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52..244bd5e5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0]).
+ is_duplicate/2, multiple_routing_keys/0, payload_size/0]).
-export([start/1, stop/0]).
@@ -316,6 +316,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
+-rabbit_upgrade({payload_size, local, []}).
-ifdef(use_specs).
@@ -374,6 +375,7 @@
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').
+-spec(payload_size/0 :: () -> 'ok').
-endif.
@@ -1781,6 +1783,18 @@ multiple_routing_keys() ->
end),
ok.
+payload_size() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, RoutingKeys,
+ {content, _ClassId, _Properties, _PropertiesBin, _Protocol,
+ PayloadFragmentsRev} = Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, RoutingKeys,
+ Content, MsgId, Persistent,
+ iolist_size(PayloadFragmentsRev)}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
%% Assumes message store is not running
transform_storage(TransformFun) ->