diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-26 14:09:40 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-26 14:09:40 +0100 |
commit | 8b2f8c068c6fa4a38025b8ced4c290c824fd4129 (patch) | |
tree | d97338f3130ea9a9a1d67de10e09021fe9ef0134 | |
parent | 2445aa040757f9a33d7ddcf135022c4a911ff803 (diff) | |
download | rabbitmq-server-8b2f8c068c6fa4a38025b8ced4c290c824fd4129.tar.gz |
md5sum the queue index directory. Also detect and crash if a collision occursbug23019
-rw-r--r-- | src/rabbit_queue_index.erl | 21 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
3 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 91b19976..d6b8bb28 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -193,7 +193,7 @@ {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). --spec(init/3 :: (rabbit_amqqueue:name(), boolean(), +-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), fun ((rabbit_guid:guid()) -> boolean())) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). @@ -220,8 +220,8 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name, not Recover), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 @@ -356,9 +356,14 @@ recover(DurableQueues) -> %% startup and shutdown %%---------------------------------------------------------------------------- -blank_state(QueueName) -> +blank_state(QueueName, EnsureFresh) -> StrName = queue_name_to_dir_name(QueueName), Dir = filename:join(queues_dir(), StrName), + ok = case EnsureFresh of + true -> false = filelib:is_file(Dir), %% is_file == is file or dir + ok; + false -> ok + end, ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), @@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, + <<Num:128>> = erlang:md5(term_to_binary(Name)), lists:flatten(io_lib:format("~.36B", [Num])). queues_dir() -> @@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), + recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 516e9134..0b92682a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1546,7 +1546,7 @@ test_queue() -> init_test_queue() -> rabbit_queue_index:init( - test_queue(), false, + test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 92ffc511..0f52eee8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -368,10 +368,10 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, _Recover) -> +init(QueueName, IsDurable, Recover) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( - QueueName, + QueueName, Recover, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) |