diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-07 15:13:48 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-07 15:13:48 +0000 |
commit | b19872da69e42922c0ec95f07022b78dd367f7c0 (patch) | |
tree | accd32707f28f60bb2a845d81e1a587cbe9456fa | |
parent | 2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff) | |
parent | a8420cfc3361ccc308f8ff4ae9d7289fd62614e0 (diff) | |
download | rabbitmq-server-b19872da69e42922c0ec95f07022b78dd367f7c0.tar.gz |
Merged default.
-rw-r--r-- | src/rabbit_basic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 81 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 5 |
6 files changed, 79 insertions, 34 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index e645a9ee..b8211d43 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> {ok, #basic_message{ exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), - id = rabbit_guid:guid(), + id = rabbit_guid:gen(), is_persistent = is_message_persistent(DecodedContent), routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}} diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f17f98ca..af8d4c3a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -746,7 +746,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of - <<>> -> rabbit_guid:binstring_guid("amq.ctag"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.ctag"); Other -> Other end, @@ -975,7 +976,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, false -> none end, ActualNameBin = case QueueNameBin of - <<>> -> rabbit_guid:binstring_guid("amq.gen"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 3ac6ae74..84f4f8a9 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -250,7 +250,7 @@ remove_all(Table, Pattern) -> mnesia:match_object(Table, Pattern, write)). new_node_id() -> - rabbit_guid:guid(). + rabbit_guid:gen(). split_topic_key(Key) -> split_topic_key(Key, [], []). @@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [C | RevWordAcc], RevResAcc). - diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 70772ccd..42a506d9 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -19,7 +19,7 @@ -behaviour(gen_server). -export([start_link/0]). --export([guid/0, string_guid/1, binstring_guid/1]). +-export([gen/0, gen_secure/0, string/2, binary/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,9 +38,10 @@ -type(guid() :: binary()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(guid/0 :: () -> guid()). --spec(string_guid/1 :: (any()) -> string()). --spec(binstring_guid/1 :: (any()) -> binary()). +-spec(gen/0 :: () -> guid()). +-spec(gen_secure/0 :: () -> guid()). +-spec(string/2 :: (guid(), any()) -> string()). +-spec(binary/2 :: (guid(), any()) -> binary()). -endif. @@ -65,11 +66,8 @@ update_disk_serial() -> end, Serial. -%% generate a GUID. -%% -%% The id is only unique within a single cluster and as long as the -%% serial store hasn't been deleted. -guid() -> +%% Generate an un-hashed guid. +fresh() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a %% restart, or ids were generated at a high rate (which causes @@ -78,29 +76,74 @@ guid() -> %% %% A persisted serial number, the node, and a unique reference %% (per node incarnation) uniquely identifies a process in space - %% and time. We combine that with a process-local counter to give - %% us a GUID. - G = case get(guid) of - undefined -> Serial = gen_server:call(?SERVER, serial, infinity), - {{Serial, node(), make_ref()}, 0}; + %% and time. + Serial = gen_server:call(?SERVER, serial, infinity), + {Serial, node(), make_ref()}. + +advance_blocks({B1, B2, B3, B4}, I) -> + %% To produce a new set of blocks, we create a new 32bit block hashing {B5, + %% I}. The new hash is used as last block, and the other three blocks are + %% XORed with it. + %% Doing this is convenient because it avoids cascading conflits, while + %% being very fast. The conflicts are avoided by propagating the changes + %% through all the blocks at each round by XORing, so the only occasion in + %% which a collision will take place is when all 4 blocks are the same and + %% the counter is the same. + %% The range (2^32) is provided explicitly since phash uses 2^27 by default. + B5 = erlang:phash2({B1, I}, 4294967296), + {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}. + +blocks_to_binary({B1, B2, B3, B4}) -> + <<B1:32/integer,B2:32/integer, B3:32/integer,B4:32/integer>>. + +%% generate a GUID. This function should be used when performance is a +%% priority and predictability is not an issue. Otherwise, use gen_secure/0. +gen() -> + %% We hash a fresh GUID with md5, split it in 4 blocks, and each time we + %% need a new guid we rotate them producing a new hash with the aid of the + %% counter. Look at the comments in advance_blocks/2 for details. + {BS, I} = + case get(guid) of + undefined -> + G = fresh(), + <<B1:32/integer,B2:32/integer, B3:32/integer,B4:32/integer>> = + erlang:md5(term_to_binary(G)), + {{B1,B2,B3,B4}, 0}; + {BS0, I0} -> + advance_blocks(BS0, I0) + end, + put(guid, {BS, I}), + blocks_to_binary(BS). + +%% generate a non-predictable GUID. +%% +%% The id is only unique within a single cluster and as long as the +%% serial store hasn't been deleted. +%% +%% If you are not concerned with predictability, gen/0 is faster. +gen_secure() -> + %% Here instead of hashing once we hash the GUID and the counter each time, + %% so that the GUID is not predictable. + G = case get(guid_secure) of + undefined -> {fresh(), 0}; {S, I} -> {S, I+1} end, - put(guid, G), + put(guid_secure, G), erlang:md5(term_to_binary(G)). %% generate a readable string representation of a GUID. %% %% employs base64url encoding, which is safer in more contexts than %% plain base64. -string_guid(Prefix) -> +string(G, Prefix) -> Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; ($\/, Acc) -> [$\_ | Acc]; ($\=, Acc) -> Acc; (Chr, Acc) -> [Chr | Acc] - end, [], base64:encode_to_string(guid())). + end, [], base64:encode_to_string(G)). -binstring_guid(Prefix) -> - list_to_binary(string_guid(Prefix)). +binary(G, Prefix) -> + list_to_binary(string(G, Prefix)). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c645a436..7a96af26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1786,10 +1786,10 @@ test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), - Ref2 = rabbit_guid:guid(), + Ref2 = rabbit_guid:gen(), {Cap2, MSC2State} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref2), %% check we don't contain any of the msgs we're about to publish @@ -1941,7 +1941,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> passed. test_msg_store_confirm_timer() -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), MSCState = rabbit_msg_store:client_init( @@ -1970,7 +1970,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> test_msg_store_client_delete_and_terminate() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), ok = msg_store_write(MsgIds, MSCState), %% test the 'dying client' fast path for writes @@ -1986,7 +1986,7 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), Terms = rabbit_queue_index:shutdown_terms(TestQueue), - PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( TestQueue, Terms, false, @@ -2020,7 +2020,7 @@ restart_app() -> rabbit:start(). queue_index_publish(SeqIds, Persistent, Qi) -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgStore = case Persistent of true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE @@ -2029,7 +2029,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {A, B = [{_SeqId, LastMsgIdWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( MsgId, SeqId, #message_properties{}, Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), @@ -2052,7 +2052,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ea7f0c78..52eb168a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, Terms1} = case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:guid(), []}; + undefined -> {rabbit_guid:gen(), []}; PRef1 -> {PRef1, Terms} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, @@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). + msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, + Callback). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), |