summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-08 11:18:26 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-08 11:18:26 +0100
commit4858a7dbbaa8a79fb38be73fa31f332f1fde6e77 (patch)
tree04f8238ebd83142d73b6203e0ca8ab5e1e5f48b2
parent01e69ce9002ecf1b2f27738a2cf1a72ecb834c72 (diff)
downloadrabbitmq-server-4858a7dbbaa8a79fb38be73fa31f332f1fde6e77.tar.gz
change guid to a binary, using the md5 of term_to_binary
The main motivation is to reduce the memory and on-disk footprint of the guid from ~34 bytes to 16. But it turns out that this actually results in a speed improvement of a few percent as well, even for non-persistent messaging, presumably due to the memory management effects and the fact that 16 byte binaries are easier to copy between processes than the deep(ish) original guid structure.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_guid.erl4
-rw-r--r--src/rabbit_msg_file.erl37
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_tests.erl38
5 files changed, 44 insertions, 39 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c17ac7eb..095044e7 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -89,7 +89,7 @@
-type(file_open_mode() :: any()).
%% this is really an abstract type, but dialyzer does not support them
--type(guid() :: any()).
+-type(guid() :: binary()).
-type(txn() :: guid()).
-type(pkey() :: guid()).
-type(r(Kind) ::
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 45816b85..5053d188 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -99,7 +99,7 @@ guid() ->
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a guid. Note that any
%% monotonicity of the guid is not preserved in the encoding.
@@ -110,7 +110,7 @@ string_guid(Prefix) ->
%%
%% TODO: once debian stable and EPEL have moved from R11B-2 to
%% R11B-4 or later we should change this to use base64.
- Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ ssl_base64:encode(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index f14656cf..46128612 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -46,7 +46,7 @@
-ifdef(use_specs).
-type(io_device() :: any()).
--type(msg_id() :: any()).
+-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(msg_attrs() :: any()).
-type(position() :: non_neg_integer()).
@@ -63,16 +63,16 @@
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody, MsgAttrs) ->
- [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins =
- [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]],
- [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
- [size(B) || B <- Bins],
+append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) ->
+ MsgBodyBin = term_to_binary(MsgBody),
+ MsgAttrsBin = term_to_binary(MsgAttrs),
+ [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
+ [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]],
Size = lists:sum(Sizes),
case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS,
- MsgIdBin:MsgIdBinSize/binary,
+ MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
@@ -85,17 +85,16 @@ read(FileHdl, TotalSize) ->
SizeWriteOkBytes = Size + 1,
case file:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS,
Rest:SizeWriteOkBytes/binary>>} ->
- BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize,
- <<MsgIdBin:MsgIdBinSize/binary,
+ BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize,
+ <<MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest,
- [MsgId, MsgBody, MsgAttrs] =
- [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]],
- {ok, {MsgId, MsgBody, MsgAttrs}};
+ {ok, {MsgId,
+ binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}};
KO -> KO
end.
@@ -119,10 +118,10 @@ read_next(FileHdl, Offset) ->
case file:read(FileHdl, ThreeIntegers) of
{ok,
<<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} ->
if Size == 0 -> eof; %% Nothing we can do other than stop
- MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 ->
+ MsgIdSize == 0 orelse MsgAttrsBinSize == 0 ->
%% current message corrupted, try skipping past it
ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
case file:position(FileHdl, {cur, Size + 1}) of
@@ -131,9 +130,9 @@ read_next(FileHdl, Offset) ->
KO -> KO
end;
true -> %% all good, let's continue
- HeaderSize = MsgIdBinSize + MsgAttrsBinSize,
+ HeaderSize = MsgIdSize + MsgAttrsBinSize,
case file:read(FileHdl, HeaderSize) of
- {ok, <<MsgIdBin:MsgIdBinSize/binary,
+ {ok, <<MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary>>} ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
@@ -144,7 +143,7 @@ read_next(FileHdl, Offset) ->
case file:read(FileHdl, 1) of
{ok, <<?WRITE_OK_MARKER:
?WRITE_OK_SIZE_BITS>>} ->
- {ok, {binary_to_term(MsgIdBin),
+ {ok, {MsgId,
binary_to_term(MsgAttrsBin),
TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 357c4867..da904193 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -82,7 +82,7 @@
-type(mode() :: 'ram_disk' | 'disk_only').
-type(dets_table() :: any()).
-type(ets_table() :: any()).
--type(msg_id() :: any()).
+-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(msg_attrs() :: any()).
-type(file_path() :: any()).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 039e9aa4..1f2187bc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -848,8 +848,11 @@ benchmark_disk_queue() ->
passed.
rdq_message(MsgId, MsgBody, IsPersistent) ->
- rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent).
+ rabbit_basic:message(x, <<>>, [], MsgBody, term_to_binary(MsgId),
+ IsPersistent).
+rdq_match_message(Msg, MsgId, MsgBody, Size) when is_number(MsgId) ->
+ rdq_match_message(Msg, term_to_binary(MsgId), MsgBody, Size);
rdq_match_message(
#basic_message { guid = MsgId, content =
#content { payload_fragments_rev = [MsgBody] }},
@@ -860,13 +863,17 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f
#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) ->
ok.
+commit_list(List, MsgCount) ->
+ lists:zip([term_to_binary(MsgId) || MsgId <- List],
+ lists:duplicate(MsgCount, false)).
+
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
- CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ CommitList = commit_list(List, MsgCount),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
@@ -905,7 +912,7 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
- CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ CommitList = commit_list(List, MsgCount),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
rabbit_disk_queue:tx_commit(q, CommitList, []),
StartChunk = round(MsgCount / 20), % 5%
@@ -948,7 +955,7 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1005,7 +1012,7 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1058,7 +1065,7 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1170,12 +1177,10 @@ rdq_test_mixed_queue_modes() ->
rdq_test_mode_conversion_mid_txn() ->
Payload = <<0:(8*256)>>,
MsgIdsA = lists:seq(0,9),
- MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
- (0 == MsgId rem 2))
- || MsgId <- MsgIdsA ],
+ MsgsA = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2))
+ || MsgId <- MsgIdsA ],
MsgIdsB = lists:seq(10,20),
- MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
- (0 == MsgId rem 2))
+ MsgsB = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2))
|| MsgId <- MsgIdsB ],
rdq_virgin(),
@@ -1229,7 +1234,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
{AckTags, MS8} =
lists:foldl(
fun (Msg, {Acc, MS7}) ->
- Rem = Len1 - (Msg #basic_message.guid) - 1,
+ MsgId = binary_to_term(Msg #basic_message.guid),
+ Rem = Len1 - MsgId - 1,
{{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
ok = rdq_match_messages(Msg, Msg1),
@@ -1243,7 +1249,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
{AckTags, MS8} =
lists:foldl(
fun (Msg, {Acc, MS7}) ->
- Rem = Len0 - (Msg #basic_message.guid) - 1,
+ MsgId = binary_to_term(Msg #basic_message.guid),
+ Rem = Len0 - MsgId - 1,
{{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
ok = rdq_match_messages(Msg, Msg1),
@@ -1266,9 +1273,8 @@ rdq_test_disk_queue_modes() ->
Total = 1000,
Half1 = lists:seq(1,round(Total/2)),
Half2 = lists:seq(1 + round(Total/2), Total),
- CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)),
- CommitHalf2 = lists:zip(Half2, lists:duplicate
- (Total - round(Total/2), false)),
+ CommitHalf1 = commit_list(Half1, round(Total/2)),
+ CommitHalf2 = commit_list(Half2, Total - round(Total/2)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []),
io:format("Publish done~n", []),