summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-26 14:09:40 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-26 14:09:40 +0100
commit8b2f8c068c6fa4a38025b8ced4c290c824fd4129 (patch)
treed97338f3130ea9a9a1d67de10e09021fe9ef0134
parent2445aa040757f9a33d7ddcf135022c4a911ff803 (diff)
downloadrabbitmq-server-8b2f8c068c6fa4a38025b8ced4c290c824fd4129.tar.gz
md5sum the queue index directory. Also detect and crash if a collision occursbug23019
-rw-r--r--src/rabbit_queue_index.erl21
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl4
3 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 91b19976..d6b8bb28 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -193,7 +193,7 @@
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
--spec(init/3 :: (rabbit_amqqueue:name(), boolean(),
+-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
fun ((rabbit_guid:guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -220,8 +220,8 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name, not Recover),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,9 +356,14 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName) ->
+blank_state(QueueName, EnsureFresh) ->
StrName = queue_name_to_dir_name(QueueName),
Dir = filename:join(queues_dir(), StrName),
+ ok = case EnsureFresh of
+ true -> false = filelib:is_file(Dir), %% is_file == is file or dir
+ ok;
+ false -> ok
+ end,
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
@@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
lists:flatten(io_lib:format("~.36B", [Num])).
queues_dir() ->
@@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
+ recover_journal(blank_state(QueueName, false)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 516e9134..0b92682a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1546,7 +1546,7 @@ test_queue() ->
init_test_queue() ->
rabbit_queue_index:init(
- test_queue(), false,
+ test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 92ffc511..0f52eee8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -368,10 +368,10 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, _Recover) ->
+init(QueueName, IsDurable, Recover) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
- QueueName,
+ QueueName, Recover,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)