summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-02-13 16:34:50 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-02-13 16:34:50 +0000
commitf1ddbda5b10f395f9bdb9bfa094012369df0689b (patch)
treeae86902044cd83c806361e06d77b4456bc553681
parent027cab06ff9840e7a2fd04284da908a24b6bd66b (diff)
parent6fa897cc4c8629b62673f81be06e073ed4e45e23 (diff)
downloadrabbitmq-server-f1ddbda5b10f395f9bdb9bfa094012369df0689b.tar.gz
merge bug25853 into default
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_file.erl9
-rw-r--r--src/rabbit_queue_index.erl63
3 files changed, 66 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c0478579..019cebe6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -221,36 +221,37 @@ start(Qs) ->
find_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
+ mnesia:async_dirty(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
pid = Pid}
<- mnesia:table(rabbit_durable_queue),
- mnesia:read(rabbit_queue, Name, read) =:= [],
- node(Pid) == Node]))
+ node(Pid) == Node,
+ mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
recover_durable_queues(QueuesAndRecoveryTerms) ->
- Qs = [{start_queue_process(node(), Q), Terms} ||
- {Q, Terms} <- QueuesAndRecoveryTerms],
- [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
- gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
+ {Results, Failures} =
+ gen_server2:mcall([{start_queue_process(node(), Q),
+ {init, {self(), Terms}}} ||
+ {Q, Terms} <- QueuesAndRecoveryTerms]),
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [Q || {_, {new, Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q0 = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
- Q1 = start_queue_process(Node, Q0),
- gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
+ Q = rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = []}),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
- Q#amqqueue{pid = Pid}.
+ Pid.
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 1a766b05..4658ecfd 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -94,9 +94,12 @@ ensure_dir_internal(File) ->
end.
wildcard(Pattern, Dir) ->
- {ok, Files} = list_dir(Dir),
- {ok, RE} = re:compile(Pattern, [anchored]),
- [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])].
+ case list_dir(Dir) of
+ {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]),
+ [File || File <- Files,
+ match =:= re:run(File, RE, [{capture, none}])];
+ {error, _} -> []
+ end.
list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 919b7376..e00508b4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count} =
+ {Segments1, Count, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
- %% find the number of unacked messages.
+ %% find the number of unacked messages. Also accumulate the
+ %% dirty count here, so we can call maybe_flush_journal below
+ %% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc}) ->
- Segment = #segment { unacked = UnackedCount } =
+ fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
+ {Segment = #segment { unacked = UnackedCount }, Dirty} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
- {segment_store(Segment, Segments2), CountAcc + UnackedCount}
- end, {Segments, 0}, all_segment_nums(State1)),
- %% Unconditionally flush since the dirty_count doesn't get updated
- %% by the above foldl.
- State2 = flush_journal(State1 #qistate { segments = Segments1 }),
+ {segment_store(Segment, Segments2),
+ CountAcc + UnackedCount, DirtyCount + Dirty}
+ end, {Segments, 0, 0}, all_segment_nums(State1)),
+ State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
+ dirty_count = DirtyCount }),
{Count, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
@@ -463,23 +465,25 @@ recover_segment(ContainsCheckFun, CleanShutdown,
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- Segment1) ->
+ SegmentAndDirtyCount) ->
recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, Segment1)
+ Del, RelSeq, SegmentAndDirtyCount)
end,
- Segment #segment { unacked = UnackedCount + UnackedCountDelta },
+ {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
SegEntries1).
-recover_message( true, true, _Del, _RelSeq, Segment) ->
- Segment;
-recover_message( true, false, del, _RelSeq, Segment) ->
- Segment;
-recover_message( true, false, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, del, Segment);
-recover_message(false, _, del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, Segment);
-recover_message(false, _, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack,
+ add_to_journal(RelSeq, del, Segment)),
+ DirtyCount + 2}.
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
@@ -651,9 +655,18 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- {ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1).
+ case is_journal_present(State) of
+ true -> {JournalHdl, State1} = get_journal_handle(State),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ load_journal_entries(State1);
+ false -> State
+ end.
+
+is_journal_present(#qistate { journal_handle = undefined,
+ dir = Dir }) ->
+ rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME));
+is_journal_present(_) ->
+ true.
%% ditto
recover_journal(State) ->