diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-19 18:58:45 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-19 18:58:45 +0000 |
commit | db3848bb726db4019c8e595e72109e3daa81ab7e (patch) | |
tree | 9e39064aae3b59fb63c41ab4c21f4cee2b93f044 | |
parent | a4891ce1d6006c6f36c8408d96028b7b3ee35be9 (diff) | |
parent | c1725533189aa08b5ceeff3f92281850de678760 (diff) | |
download | rabbitmq-server-db3848bb726db4019c8e595e72109e3daa81ab7e.tar.gz |
merge bug25409 into bug25394
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 17 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_invalid.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 2 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 7 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 26 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 150 |
9 files changed, 131 insertions, 85 deletions
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) else \ dialyzer --output_plt $@ --build_plt \ --apps erts kernel stdlib compiler sasl os_mon mnesia tools \ - public_key crypto ssl; \ + public_key crypto ssl xmerl; \ fi clean: diff --git a/src/rabbit.erl b/src/rabbit.erl index 7b8348fc..16694105 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -533,6 +533,9 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +-ifdef(use_specs). +-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()). +-endif. boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = @@ -552,13 +555,15 @@ boot_error(Reason, Stacktrace) -> Args = [Reason, log_location(kernel), log_location(sasl)], boot_error(Reason, Fmt, Args, Stacktrace). +-ifdef(use_specs). +-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()]) + -> no_return()). +-endif. +boot_error(Reason, Fmt, Args, not_available) -> + basic_boot_error(Reason, Fmt, Args); boot_error(Reason, Fmt, Args, Stacktrace) -> - case Stacktrace of - not_available -> basic_boot_error(Reason, Fmt, Args); - _ -> basic_boot_error(Reason, Fmt ++ - "Stack trace:~n ~p~n~n", - Args ++ [Stacktrace]) - end. + basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n", + Args ++ [Stacktrace]). basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2b43c8ba..4245f7e2 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -168,7 +168,7 @@ %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), rabbit_types:message_properties(), - A) -> {('stop' | 'cont'), A}), + boolean(), A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e3dfc5..2b89be8f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -412,8 +412,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +-ifdef(use_specs). +-spec(precondition_failed/1 :: (string()) -> no_return()). +-endif. precondition_failed(Format) -> precondition_failed(Format, []). +-ifdef(use_specs). +-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()). +-endif. precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 101fe434..c5d781c2 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -31,6 +31,10 @@ description() -> serialise_events() -> false. +-ifdef(use_specs). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> no_return()). +-endif. route(#exchange{name = Name, type = Type}, _) -> rabbit_misc:protocol_error( precondition_failed, diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f2ab67cd..4d6b1fc9 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -91,7 +91,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Acc) -> + case BQ:fold(fun (Msg, MsgProps, false, Acc) -> master_send(Msg, MsgProps, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 13459350..ae832749 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1007,7 +1007,12 @@ emit_stats(State) -> rabbit_event:reset_stats_timer(State, #v1.stats_timer). %% 1.0 stub - +-ifdef(use_specs). +-spec(become_1_0/3 :: ('amqp' | 'sasl', + {non_neg_integer(), non_neg_integer(), + non_neg_integer(), non_neg_integer()}, + #v1{}) -> no_return()). +-endif. become_1_0(Mode, Version, State = #v1{sock = Sock}) -> case code:is_loaded(rabbit_amqp1_0_reader) of false -> refuse_connection(Sock, {bad_version, Version}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7bd8d541..c47f2772 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2329,17 +2329,20 @@ test_variable_queue() -> passed. test_variable_queue_fold(VQ0) -> - {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), - Count = rabbit_variable_queue:len(VQ1), - Msgs = RequeuedMsgs ++ FreshMsgs, - lists:foldl( - fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, - VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Msgs, VQ0) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, A) -> + fun (M, _, Pending, A) -> MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert case MInt =< Cut of true -> {cont, [MInt | A]}; false -> {stop, A} @@ -2400,10 +2403,11 @@ variable_queue_with_holes(VQ0) -> Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {(Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. test_variable_queue_requeue(VQ0) -> - {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), Msgs = lists:zip(RequeuedMsgs, lists:duplicate(length(RequeuedMsgs), true)) ++ diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7e09e5e3..34a4b52f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -678,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, #vqstate { q1 = Q1, - q2 = Q2, - delta = #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd }, - q3 = Q3, - q4 = Q4 } = State) -> - QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, State0), - {StopGo, AccNext} = - Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), - {StopGo, {AccNext, State1}} - end, - {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), - {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), - {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, - DeltaSeqId, DeltaSeqIdEnd, State2), - {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), - {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), - {Acc5, State5}. +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, + [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -1103,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(#msg_status{msg = undefined, msg_id = MsgId, - is_persistent = IsPersistent}, - State = #vqstate{msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}; + is_persistent = IsPersistent}, State) -> + read_msg(MsgId, IsPersistent, State); read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate {msg_store_clients = MSCState1}}. + inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. @@ -1391,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue and fold +%% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1461,48 +1450,81 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; -qfoldl( Fun, {cont, Acc} = A, Q) -> +%%---------------------------------------------------------------------------- +%% Iterator +%%---------------------------------------------------------------------------- + +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). + +istate(start, State) -> {q4, State#vqstate.q4, State}; +istate(q4, State) -> {q3, State#vqstate.q3, State}; +istate(q3, State) -> {delta, State#vqstate.delta, State}; +istate(delta, State) -> {q2, State#vqstate.q2, State}; +istate(q2, State) -> {q1, State#vqstate.q1, State}; +istate(q1, _State) -> done. + +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, + {value, MsgStatus, true, Next, IndexState} + end; +next(done, IndexState) -> {empty, IndexState}; +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqId}, State}, IndexState) -> + next(istate(delta, State), IndexState); +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), + SeqId1 = lists:min([SeqIdB, SeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); +next({delta, Delta, [], State}, IndexState) -> + next({delta, Delta, State}, IndexState); +next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> + case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + false -> Next = {delta, Delta, Rest, State}, + {value, beta_msg_status(M), false, Next, IndexState}; + true -> next({delta, Delta, Rest, State}, IndexState) + end; +next({Key, Q, State}, IndexState) -> case ?QUEUE:out(Q) of - {empty, _Q} -> A; - {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + {empty, _Q} -> next(istate(Key, State), IndexState); + {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, + {value, MsgStatus, false, Next, IndexState} end. -lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; -lfoldl(_Fun, {cont, _Acc} = A, []) -> A; -lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). - -delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> - {stop, {Acc, State}}; -delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {cont, {Acc, State}}; -delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - index_state = IndexState, - msg_store_clients = MSCState } = State) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, - {Acc0, MSCState0}) -> - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, - MsgId), - {StopCont, AccNext} = - Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}}; - true -> {cont, {Acc0, MSCState0}} - end - end, {cont, {Acc, MSCState}}, List), - delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }). +inext(It, {Its, IndexState}) -> + case next(It, IndexState) of + {empty, IndexState1} -> + {Its, IndexState1}; + {value, MsgStatus1, Unacked, It1, IndexState1} -> + {[{MsgStatus1, Unacked, It1} | Its], IndexState1} + end. + +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, Unacked, It} | Rest] = + lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, + {#msg_status{seq_id = SeqId2}, _, _}) -> + SeqId1 =< SeqId2 + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), + ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) + end. %%---------------------------------------------------------------------------- %% Phase changes |