diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-13 16:34:50 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-02-13 16:34:50 +0000 |
commit | f1ddbda5b10f395f9bdb9bfa094012369df0689b (patch) | |
tree | ae86902044cd83c806361e06d77b4456bc553681 | |
parent | 027cab06ff9840e7a2fd04284da908a24b6bd66b (diff) | |
parent | 6fa897cc4c8629b62673f81be06e073ed4e45e23 (diff) | |
download | rabbitmq-server-f1ddbda5b10f395f9bdb9bfa094012369df0689b.tar.gz |
merge bug25853 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
-rw-r--r-- | src/rabbit_file.erl | 9 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 63 |
3 files changed, 66 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c0478579..019cebe6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -221,36 +221,37 @@ start(Qs) -> find_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( + mnesia:async_dirty( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, pid = Pid} <- mnesia:table(rabbit_durable_queue), - mnesia:read(rabbit_queue, Name, read) =:= [], - node(Pid) == Node])) + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) end). recover_durable_queues(QueuesAndRecoveryTerms) -> - Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- QueuesAndRecoveryTerms], - [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, - gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. + {Results, Failures} = + gen_server2:mcall([{start_queue_process(node(), Q), + {init, {self(), Terms}}} || + {Q, Terms} <- QueuesAndRecoveryTerms]), + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q0 = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), - Q1 = start_queue_process(Node, Q0), - gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). + Q = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Q#amqqueue{pid = Pid}. + Pid. add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 1a766b05..4658ecfd 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -94,9 +94,12 @@ ensure_dir_internal(File) -> end. wildcard(Pattern, Dir) -> - {ok, Files} = list_dir(Dir), - {ok, RE} = re:compile(Pattern, [anchored]), - [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])]. + case list_dir(Dir) of + {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]), + [File || File <- Files, + match =:= re:run(File, RE, [{capture, none}])]; + {error, _} -> [] + end. list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 919b7376..e00508b4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count} = + {Segments1, Count, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. + %% find the number of unacked messages. Also accumulate the + %% dirty count here, so we can call maybe_flush_journal below + %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { unacked = UnackedCount } = + fun (Seg, {Segments2, CountAcc, DirtyCount}) -> + {Segment = #segment { unacked = UnackedCount }, Dirty} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), - {segment_store(Segment, Segments2), CountAcc + UnackedCount} - end, {Segments, 0}, all_segment_nums(State1)), - %% Unconditionally flush since the dirty_count doesn't get updated - %% by the above foldl. - State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {segment_store(Segment, Segments2), + CountAcc + UnackedCount, DirtyCount + Dirty} + end, {Segments, 0, 0}, all_segment_nums(State1)), + State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, + dirty_count = DirtyCount }), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -463,23 +465,25 @@ recover_segment(ContainsCheckFun, CleanShutdown, segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - Segment1) -> + SegmentAndDirtyCount) -> recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, Segment1) + Del, RelSeq, SegmentAndDirtyCount) end, - Segment #segment { unacked = UnackedCount + UnackedCountDelta }, + {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, SegEntries1). -recover_message( true, true, _Del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, del, Segment); -recover_message(false, _, del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, Segment); -recover_message(false, _, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). +recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, del, Segment), DirtyCount + 1}; +recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; +recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, + add_to_journal(RelSeq, del, Segment)), + DirtyCount + 2}. queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), @@ -651,9 +655,18 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1). + case is_journal_present(State) of + true -> {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + load_journal_entries(State1); + false -> State + end. + +is_journal_present(#qistate { journal_handle = undefined, + dir = Dir }) -> + rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)); +is_journal_present(_) -> + true. %% ditto recover_journal(State) -> |