diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-10-30 13:20:09 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-10-30 13:20:09 +0000 |
commit | 5f6bdb602b6be4ee69dccba9e87b55621a981686 (patch) | |
tree | 3f75b3ab8b14ca00954f2765e9ac75d09746df6e | |
parent | 3a8414a766f31afe217996488a9efc36754bf0b4 (diff) | |
download | rabbitmq-server-bug25858.tar.gz |
Track payload size of messages (ignoring sharing)bug25858
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 5 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 16 |
4 files changed, 22 insertions, 4 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index bd7a0eed..c91f3ae5 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -67,7 +67,7 @@ -record(runtime_parameters, {key, value}). -record(basic_message, {exchange_name, routing_keys = [], content, id, - is_persistent}). + is_persistent, payload_size}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, sender, message, msg_seq_no}). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536..aef316d3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -149,13 +149,16 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} headers = Headers0}}) end. -message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> +message(XName, RoutingKey, + #content{properties = Props, + payload_fragments_rev = PFR} = DecodedContent) -> try {ok, #basic_message{ exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), id = rabbit_guid:gen(), is_persistent = is_message_persistent(DecodedContent), + payload_size = iolist_size(PFR), routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}} catch diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index a36613db..23e5e988 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -65,7 +65,8 @@ routing_keys :: [rabbit_router:routing_key()], content :: content(), id :: msg_id(), - is_persistent :: boolean()}). + is_persistent :: boolean(), + payload_size :: pos_integer()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52..244bd5e5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0]). + is_duplicate/2, multiple_routing_keys/0, payload_size/0]). -export([start/1, stop/0]). @@ -316,6 +316,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). +-rabbit_upgrade({payload_size, local, []}). -ifdef(use_specs). @@ -374,6 +375,7 @@ -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(multiple_routing_keys/0 :: () -> 'ok'). +-spec(payload_size/0 :: () -> 'ok'). -endif. @@ -1781,6 +1783,18 @@ multiple_routing_keys() -> end), ok. +payload_size() -> + transform_storage( + fun ({basic_message, ExchangeName, RoutingKeys, + {content, _ClassId, _Properties, _PropertiesBin, _Protocol, + PayloadFragmentsRev} = Content, + MsgId, Persistent}) -> + {ok, {basic_message, ExchangeName, RoutingKeys, + Content, MsgId, Persistent, + iolist_size(PayloadFragmentsRev)}}; + (_) -> {error, corrupt_message} + end), + ok. %% Assumes message store is not running transform_storage(TransformFun) -> |