From 250667f80cffb7f6a63d049e1480df42d7460b6f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 5 Dec 2014 14:44:31 +0000 Subject: Introduce a selection mechanism so that small messages go to the index, and large ones go to the store. --- ebin/rabbit_app.in | 1 + src/rabbit_variable_queue.erl | 46 +++++++++++++++++++++++++++++++------------ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9e5584a1..5ebef608 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,6 +29,7 @@ {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, + {queue_index_embed_msgs_below, 1024}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 884342d8..7a7a2900 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -989,7 +989,11 @@ beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus.%% TODO #msg_status { msg = undefined }. +trim_msg_status(MsgStatus = #msg_status{msg_props = MsgProps}) -> + case persist_to(MsgProps) of + msg_store -> MsgStatus#msg_status{msg = undefined}; + queue_index -> MsgStatus + end. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1325,14 +1329,15 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, + msg_props = MsgProps, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - %% Msg1 = Msg #basic_message { - %% %% don't persist any recoverable decoded properties - %% content = rabbit_binary_parser:clear_decoded_content( - %% Msg #basic_message.content)}, - %% ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus; %% #msg_status { msg_on_disk = true }; + case persist_to(MsgProps) of + msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + prepare_to_store(Msg)), + MsgStatus#msg_status{msg_in_store = true}; + queue_index -> MsgStatus + end; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. @@ -1347,13 +1352,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { is_delivered = IsDelivered, msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, IndexState1 = rabbit_queue_index:publish( - Msg1, SeqId, MsgProps, IsPersistent, IndexState), - {MsgStatus #msg_status { index_on_disk = true }, + case persist_to(MsgProps) of + msg_store -> MsgId; + queue_index -> prepare_to_store(Msg) + end, SeqId, MsgProps, IsPersistent, IndexState), + {MsgStatus#msg_status{index_on_disk = true}, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. @@ -1366,6 +1370,22 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), {MsgStatus2, State #vqstate { index_state = IndexState1 }}. +persist_to(#message_properties{size = Size}) -> + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), + %% This is >= so that you can set the env to 0 and never persist + %% to the index. + case Size >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end. + +prepare_to_store(Msg) -> + Msg#basic_message{ + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}. + %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -- cgit v1.2.1