summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-28 15:21:00 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-28 15:21:00 +0100
commit538c8e830c672afbf1e12d5b1c3b95fd2f9c49bf (patch)
treeea4d13e5130286e72cf32ea323954c13858b55f8
parentb7ad7e9c73f4688783d155acd66a9cd92b292291 (diff)
parenta77d43aff8379b35f33b32a22355bdae8d101af5 (diff)
downloadrabbitmq-server-538c8e830c672afbf1e12d5b1c3b95fd2f9c49bf.tar.gz
Merge bug21922 into default.
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_queue_index.erl21
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl4
4 files changed, 16 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4db3ace..dafc3075 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -967,7 +967,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end;
{empty, _} ->
rabbit_misc:protocol_error(
- not_found, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 91b19976..d6b8bb28 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -193,7 +193,7 @@
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
--spec(init/3 :: (rabbit_amqqueue:name(), boolean(),
+-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
fun ((rabbit_guid:guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -220,8 +220,8 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name, not Recover),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,9 +356,14 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName) ->
+blank_state(QueueName, EnsureFresh) ->
StrName = queue_name_to_dir_name(QueueName),
Dir = filename:join(queues_dir(), StrName),
+ ok = case EnsureFresh of
+ true -> false = filelib:is_file(Dir), %% is_file == is file or dir
+ ok;
+ false -> ok
+ end,
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
@@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
lists:flatten(io_lib:format("~.36B", [Num])).
queues_dir() ->
@@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
+ recover_journal(blank_state(QueueName, false)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 516e9134..0b92682a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1546,7 +1546,7 @@ test_queue() ->
init_test_queue() ->
rabbit_queue_index:init(
- test_queue(), false,
+ test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 92ffc511..0f52eee8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -368,10 +368,10 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, _Recover) ->
+init(QueueName, IsDurable, Recover) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
- QueueName,
+ QueueName, Recover,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)