diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-08 11:18:26 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-08 11:18:26 +0100 |
commit | 4858a7dbbaa8a79fb38be73fa31f332f1fde6e77 (patch) | |
tree | 04f8238ebd83142d73b6203e0ca8ab5e1e5f48b2 | |
parent | 01e69ce9002ecf1b2f27738a2cf1a72ecb834c72 (diff) | |
download | rabbitmq-server-4858a7dbbaa8a79fb38be73fa31f332f1fde6e77.tar.gz |
change guid to a binary, using the md5 of term_to_binary
The main motivation is to reduce the memory and on-disk footprint of
the guid from ~34 bytes to 16. But it turns out that this actually
results in a speed improvement of a few percent as well, even for
non-persistent messaging, presumably due to the memory management
effects and the fact that 16 byte binaries are easier to copy between
processes than the deep(ish) original guid structure.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 4 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 37 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 38 |
5 files changed, 44 insertions, 39 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c17ac7eb..095044e7 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -89,7 +89,7 @@ -type(file_open_mode() :: any()). %% this is really an abstract type, but dialyzer does not support them --type(guid() :: any()). +-type(guid() :: binary()). -type(txn() :: guid()). -type(pkey() :: guid()). -type(r(Kind) :: diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 45816b85..5053d188 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -99,7 +99,7 @@ guid() -> {S, I} -> {S, I+1} end, put(guid, G), - G. + erlang:md5(term_to_binary(G)). %% generate a readable string representation of a guid. Note that any %% monotonicity of the guid is not preserved in the encoding. @@ -110,7 +110,7 @@ string_guid(Prefix) -> %% %% TODO: once debian stable and EPEL have moved from R11B-2 to %% R11B-4 or later we should change this to use base64. - Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ ssl_base64:encode(guid()). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index f14656cf..46128612 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -46,7 +46,7 @@ -ifdef(use_specs). -type(io_device() :: any()). --type(msg_id() :: any()). +-type(msg_id() :: binary()). -type(msg() :: any()). -type(msg_attrs() :: any()). -type(position() :: non_neg_integer()). @@ -63,16 +63,16 @@ %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody, MsgAttrs) -> - [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins = - [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]], - [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = - [size(B) || B <- Bins], +append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) -> + MsgBodyBin = term_to_binary(MsgBody), + MsgAttrsBin = term_to_binary(MsgAttrs), + [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = + [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]], Size = lists:sum(Sizes), case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS, - MsgIdBin:MsgIdBinSize/binary, + MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of @@ -85,17 +85,16 @@ read(FileHdl, TotalSize) -> SizeWriteOkBytes = Size + 1, case file:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS, Rest:SizeWriteOkBytes/binary>>} -> - BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize, - <<MsgIdBin:MsgIdBinSize/binary, + BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize, + <<MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest, - [MsgId, MsgBody, MsgAttrs] = - [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]], - {ok, {MsgId, MsgBody, MsgAttrs}}; + {ok, {MsgId, + binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}}; KO -> KO end. @@ -119,10 +118,10 @@ read_next(FileHdl, Offset) -> case file:read(FileHdl, ThreeIntegers) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} -> if Size == 0 -> eof; %% Nothing we can do other than stop - MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 -> + MsgIdSize == 0 orelse MsgAttrsBinSize == 0 -> %% current message corrupted, try skipping past it ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, case file:position(FileHdl, {cur, Size + 1}) of @@ -131,9 +130,9 @@ read_next(FileHdl, Offset) -> KO -> KO end; true -> %% all good, let's continue - HeaderSize = MsgIdBinSize + MsgAttrsBinSize, + HeaderSize = MsgIdSize + MsgAttrsBinSize, case file:read(FileHdl, HeaderSize) of - {ok, <<MsgIdBin:MsgIdBinSize/binary, + {ok, <<MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary>>} -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, @@ -144,7 +143,7 @@ read_next(FileHdl, Offset) -> case file:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - {ok, {binary_to_term(MsgIdBin), + {ok, {MsgId, binary_to_term(MsgAttrsBin), TotalSize, NextOffset}}; {ok, _SomeOtherData} -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 357c4867..da904193 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -82,7 +82,7 @@ -type(mode() :: 'ram_disk' | 'disk_only'). -type(dets_table() :: any()). -type(ets_table() :: any()). --type(msg_id() :: any()). +-type(msg_id() :: binary()). -type(msg() :: any()). -type(msg_attrs() :: any()). -type(file_path() :: any()). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 039e9aa4..1f2187bc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -848,8 +848,11 @@ benchmark_disk_queue() -> passed. rdq_message(MsgId, MsgBody, IsPersistent) -> - rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent). + rabbit_basic:message(x, <<>>, [], MsgBody, term_to_binary(MsgId), + IsPersistent). +rdq_match_message(Msg, MsgId, MsgBody, Size) when is_number(MsgId) -> + rdq_match_message(Msg, term_to_binary(MsgId), MsgBody, Size); rdq_match_message( #basic_message { guid = MsgId, content = #content { payload_fragments_rev = [MsgBody] }}, @@ -860,13 +863,17 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) -> ok. +commit_list(List, MsgCount) -> + lists:zip([term_to_binary(MsgId) || MsgId <- List], + lists:duplicate(MsgCount, false)). + rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), - CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + CommitList = commit_list(List, MsgCount), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) @@ -905,7 +912,7 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), - CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + CommitList = commit_list(List, MsgCount), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], rabbit_disk_queue:tx_commit(q, CommitList, []), StartChunk = round(MsgCount / 20), % 5% @@ -948,7 +955,7 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1005,7 +1012,7 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1058,7 +1065,7 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1170,12 +1177,10 @@ rdq_test_mixed_queue_modes() -> rdq_test_mode_conversion_mid_txn() -> Payload = <<0:(8*256)>>, MsgIdsA = lists:seq(0,9), - MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, - (0 == MsgId rem 2)) - || MsgId <- MsgIdsA ], + MsgsA = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2)) + || MsgId <- MsgIdsA ], MsgIdsB = lists:seq(10,20), - MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, - (0 == MsgId rem 2)) + MsgsB = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2)) || MsgId <- MsgIdsB ], rdq_virgin(), @@ -1229,7 +1234,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - {AckTags, MS8} = lists:foldl( fun (Msg, {Acc, MS7}) -> - Rem = Len1 - (Msg #basic_message.guid) - 1, + MsgId = binary_to_term(Msg #basic_message.guid), + Rem = Len1 - MsgId - 1, {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), ok = rdq_match_messages(Msg, Msg1), @@ -1243,7 +1249,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - {AckTags, MS8} = lists:foldl( fun (Msg, {Acc, MS7}) -> - Rem = Len0 - (Msg #basic_message.guid) - 1, + MsgId = binary_to_term(Msg #basic_message.guid), + Rem = Len0 - MsgId - 1, {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), ok = rdq_match_messages(Msg, Msg1), @@ -1266,9 +1273,8 @@ rdq_test_disk_queue_modes() -> Total = 1000, Half1 = lists:seq(1,round(Total/2)), Half2 = lists:seq(1 + round(Total/2), Total), - CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)), - CommitHalf2 = lists:zip(Half2, lists:duplicate - (Total - round(Total/2), false)), + CommitHalf1 = commit_list(Half1, round(Total/2)), + CommitHalf2 = commit_list(Half2, Total - round(Total/2)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []), io:format("Publish done~n", []), |