summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-07 15:13:48 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-07 15:13:48 +0000
commitb19872da69e42922c0ec95f07022b78dd367f7c0 (patch)
treeaccd32707f28f60bb2a845d81e1a587cbe9456fa
parent2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff)
parenta8420cfc3361ccc308f8ff4ae9d7289fd62614e0 (diff)
downloadrabbitmq-server-b19872da69e42922c0ec95f07022b78dd367f7c0.tar.gz
Merged default.
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl3
-rw-r--r--src/rabbit_guid.erl81
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl5
6 files changed, 79 insertions, 34 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index e645a9ee..b8211d43 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f17f98ca..af8d4c3a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -746,7 +746,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -975,7 +976,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3ac6ae74..84f4f8a9 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -250,7 +250,7 @@ remove_all(Table, Pattern) ->
mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
- rabbit_guid:guid().
+ rabbit_guid:gen().
split_topic_key(Key) ->
split_topic_key(Key, [], []).
@@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
-
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 70772ccd..42a506d9 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([guid/0, string_guid/1, binstring_guid/1]).
+-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,10 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
+-spec(gen/0 :: () -> guid()).
+-spec(gen_secure/0 :: () -> guid()).
+-spec(string/2 :: (guid(), any()) -> string()).
+-spec(binary/2 :: (guid(), any()) -> binary()).
-endif.
@@ -65,11 +66,8 @@ update_disk_serial() ->
end,
Serial.
-%% generate a GUID.
-%%
-%% The id is only unique within a single cluster and as long as the
-%% serial store hasn't been deleted.
-guid() ->
+%% Generate an un-hashed guid.
+fresh() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
%% restart, or ids were generated at a high rate (which causes
@@ -78,29 +76,74 @@ guid() ->
%%
%% A persisted serial number, the node, and a unique reference
%% (per node incarnation) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID.
- G = case get(guid) of
- undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
- {{Serial, node(), make_ref()}, 0};
+ %% and time.
+ Serial = gen_server:call(?SERVER, serial, infinity),
+ {Serial, node(), make_ref()}.
+
+advance_blocks({B1, B2, B3, B4}, I) ->
+ %% To produce a new set of blocks, we create a new 32bit block hashing {B5,
+ %% I}. The new hash is used as last block, and the other three blocks are
+ %% XORed with it.
+ %% Doing this is convenient because it avoids cascading conflits, while
+ %% being very fast. The conflicts are avoided by propagating the changes
+ %% through all the blocks at each round by XORing, so the only occasion in
+ %% which a collision will take place is when all 4 blocks are the same and
+ %% the counter is the same.
+ %% The range (2^32) is provided explicitly since phash uses 2^27 by default.
+ B5 = erlang:phash2({B1, I}, 4294967296),
+ {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
+
+blocks_to_binary({B1, B2, B3, B4}) ->
+ <<B1:32/integer,B2:32/integer, B3:32/integer,B4:32/integer>>.
+
+%% generate a GUID. This function should be used when performance is a
+%% priority and predictability is not an issue. Otherwise, use gen_secure/0.
+gen() ->
+ %% We hash a fresh GUID with md5, split it in 4 blocks, and each time we
+ %% need a new guid we rotate them producing a new hash with the aid of the
+ %% counter. Look at the comments in advance_blocks/2 for details.
+ {BS, I} =
+ case get(guid) of
+ undefined ->
+ G = fresh(),
+ <<B1:32/integer,B2:32/integer, B3:32/integer,B4:32/integer>> =
+ erlang:md5(term_to_binary(G)),
+ {{B1,B2,B3,B4}, 0};
+ {BS0, I0} ->
+ advance_blocks(BS0, I0)
+ end,
+ put(guid, {BS, I}),
+ blocks_to_binary(BS).
+
+%% generate a non-predictable GUID.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% serial store hasn't been deleted.
+%%
+%% If you are not concerned with predictability, gen/0 is faster.
+gen_secure() ->
+ %% Here instead of hashing once we hash the GUID and the counter each time,
+ %% so that the GUID is not predictable.
+ G = case get(guid_secure) of
+ undefined -> {fresh(), 0};
{S, I} -> {S, I+1}
end,
- put(guid, G),
+ put(guid_secure, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
-string_guid(Prefix) ->
+string(G, Prefix) ->
Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(guid())).
+ end, [], base64:encode_to_string(G)).
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
+binary(G, Prefix) ->
+ list_to_binary(string(G, Prefix)).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c645a436..7a96af26 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1786,10 +1786,10 @@ test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
- Ref2 = rabbit_guid:guid(),
+ Ref2 = rabbit_guid:gen(),
{Cap2, MSC2State} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
@@ -1941,7 +1941,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
passed.
test_msg_store_confirm_timer() ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
MSCState = rabbit_msg_store:client_init(
@@ -1970,7 +1970,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
test_msg_store_client_delete_and_terminate() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
%% test the 'dying client' fast path for writes
@@ -1986,7 +1986,7 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
@@ -2020,7 +2020,7 @@ restart_app() ->
rabbit:start().
queue_index_publish(SeqIds, Persistent, Qi) ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgStore = case Persistent of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
@@ -2029,7 +2029,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
MsgId, SeqId, #message_properties{}, Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
@@ -2052,7 +2052,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ea7f0c78..52eb168a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, Terms1} =
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:guid(), []};
+ undefined -> {rabbit_guid:gen(), []};
PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
@@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),