summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-19 18:58:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-19 18:58:45 +0000
commitdb3848bb726db4019c8e595e72109e3daa81ab7e (patch)
tree9e39064aae3b59fb63c41ab4c21f4cee2b93f044
parenta4891ce1d6006c6f36c8408d96028b7b3ee35be9 (diff)
parentc1725533189aa08b5ceeff3f92281850de678760 (diff)
downloadrabbitmq-server-db3848bb726db4019c8e595e72109e3daa81ab7e.tar.gz
merge bug25409 into bug25394
-rw-r--r--Makefile2
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_exchange_type_invalid.erl4
-rw-r--r--src/rabbit_mirror_queue_sync.erl2
-rw-r--r--src/rabbit_reader.erl7
-rw-r--r--src/rabbit_tests.erl26
-rw-r--r--src/rabbit_variable_queue.erl150
9 files changed, 131 insertions, 85 deletions
diff --git a/Makefile b/Makefile
index c63e3dfd..bf33b931 100644
--- a/Makefile
+++ b/Makefile
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
else \
dialyzer --output_plt $@ --build_plt \
--apps erts kernel stdlib compiler sasl os_mon mnesia tools \
- public_key crypto ssl; \
+ public_key crypto ssl xmerl; \
fi
clean:
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7b8348fc..16694105 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -533,6 +533,9 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+-ifdef(use_specs).
+-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
+-endif.
boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
@@ -552,13 +555,15 @@ boot_error(Reason, Stacktrace) ->
Args = [Reason, log_location(kernel), log_location(sasl)],
boot_error(Reason, Fmt, Args, Stacktrace).
+-ifdef(use_specs).
+-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
+ -> no_return()).
+-endif.
+boot_error(Reason, Fmt, Args, not_available) ->
+ basic_boot_error(Reason, Fmt, Args);
boot_error(Reason, Fmt, Args, Stacktrace) ->
- case Stacktrace of
- not_available -> basic_boot_error(Reason, Fmt, Args);
- _ -> basic_boot_error(Reason, Fmt ++
- "Stack trace:~n ~p~n~n",
- Args ++ [Stacktrace])
- end.
+ basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace]).
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2b43c8ba..4245f7e2 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -168,7 +168,7 @@
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
rabbit_types:message_properties(),
- A) -> {('stop' | 'cont'), A}),
+ boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e3dfc5..2b89be8f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -412,8 +412,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+-ifdef(use_specs).
+-spec(precondition_failed/1 :: (string()) -> no_return()).
+-endif.
precondition_failed(Format) -> precondition_failed(Format, []).
+-ifdef(use_specs).
+-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
+-endif.
precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 101fe434..c5d781c2 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -31,6 +31,10 @@ description() ->
serialise_events() -> false.
+-ifdef(use_specs).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> no_return()).
+-endif.
route(#exchange{name = Name, type = Type}, _) ->
rabbit_misc:protocol_error(
precondition_failed,
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f2ab67cd..4d6b1fc9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -91,7 +91,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Acc) ->
+ case BQ:fold(fun (Msg, MsgProps, false, Acc) ->
master_send(Msg, MsgProps, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 13459350..ae832749 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -1007,7 +1007,12 @@ emit_stats(State) ->
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
%% 1.0 stub
-
+-ifdef(use_specs).
+-spec(become_1_0/3 :: ('amqp' | 'sasl',
+ {non_neg_integer(), non_neg_integer(),
+ non_neg_integer(), non_neg_integer()},
+ #v1{}) -> no_return()).
+-endif.
become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
case code:is_loaded(rabbit_amqp1_0_reader) of
false -> refuse_connection(Sock, {bad_version, Version});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7bd8d541..c47f2772 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2329,17 +2329,20 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
- Count = rabbit_variable_queue:len(VQ1),
- Msgs = RequeuedMsgs ++ FreshMsgs,
- lists:foldl(
- fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
- VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
-
-test_variable_queue_fold(Cut, Msgs, VQ0) ->
+ {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Count = rabbit_variable_queue:depth(VQ1),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
- fun (M, _, A) ->
+ fun (M, _, Pending, A) ->
MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
case MInt =< Cut of
true -> {cont, [MInt | A]};
false -> {stop, A}
@@ -2400,10 +2403,11 @@ variable_queue_with_holes(VQ0) ->
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {(Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
test_variable_queue_requeue(VQ0) ->
- {RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
Msgs =
lists:zip(RequeuedMsgs,
lists:duplicate(length(RequeuedMsgs), true)) ++
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7e09e5e3..34a4b52f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -678,25 +678,12 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, #vqstate { q1 = Q1,
- q2 = Q2,
- delta = #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd },
- q3 = Q3,
- q4 = Q4 } = State) ->
- QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, State0),
- {StopGo, AccNext} =
- Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
- {StopGo, {AccNext, State1}}
- end,
- {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
- {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
- {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
- DeltaSeqId, DeltaSeqIdEnd, State2),
- {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
- {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
- {Acc5, State5}.
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -1103,14 +1090,16 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
read_msg(#msg_status{msg = undefined,
msg_id = MsgId,
- is_persistent = IsPersistent},
- State = #vqstate{msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1}};
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
@@ -1391,7 +1380,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue and fold
+%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1461,48 +1450,81 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
-qfoldl( Fun, {cont, Acc} = A, Q) ->
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
- {empty, _Q} -> A;
- {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
end.
-lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
-lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
-lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
-
-delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
- {stop, {Acc, State}};
-delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {cont, {Acc, State}};
-delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- index_state = IndexState,
- msg_store_clients = MSCState } = State) ->
- DeltaSeqId1 = lists:min(
- [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered},
- {Acc0, MSCState0}) ->
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent,
- MsgId),
- {StopCont, AccNext} =
- Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}};
- true -> {cont, {Acc0, MSCState0}}
- end
- end, {cont, {Acc, MSCState}}, List),
- delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }).
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
%%----------------------------------------------------------------------------
%% Phase changes