summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-22 15:47:53 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-22 15:47:53 +0000
commited018f70fc43daac70e3c85e7347befde8153fc0 (patch)
treee46972a05fb1b8c6731fac61c71410ba1d4b096b
parentfb723dce3618250668e75c9da9a91fac4cc2f1fa (diff)
parentd9c055ce65f2d6c2626d23de8cc2d0bceb5aceea (diff)
downloadrabbitmq-server-ed018f70fc43daac70e3c85e7347befde8153fc0.tar.gz
merge default into bug24750
-rw-r--r--src/rabbit_amqqueue_process.erl119
-rw-r--r--src/rabbit_misc.erl13
2 files changed, 41 insertions, 91 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 41fc173b..afece3f3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -51,8 +51,7 @@
ttl,
ttl_timer_ref,
publish_seqno,
- unconfirmed_mq,
- unconfirmed_qm,
+ unconfirmed,
delayed_stop,
queue_monitors,
dlx,
@@ -138,8 +137,7 @@ init(Q) ->
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -164,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
@@ -742,11 +739,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed_mq = UMQ,
- dlx = DLX,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
{ok, _, QPids} =
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -755,20 +750,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> State3 =
- lists:foldl(
- fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- UQM1 = rabbit_misc:gb_trees_set_insert(
- QPid, MsgSeqNo, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end, State2, QPids),
- noreply(State3#q{
- unconfirmed_mq =
- gb_trees:insert(
- MsgSeqNo, {gb_sets:from_list(QPids),
- AckTag}, UMQ)})
+ [] -> cleanup_after_confirm([AckTag], State2);
+ _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
+ noreply(State2#q{unconfirmed = UC1})
end.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
@@ -787,64 +771,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+ unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
{ok, _} ->
rabbit_log:info("DLQ ~p (for ~s) died~n",
[QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
- end
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ case (MsgSeqNoAckTags =/= [] andalso
+ rabbit_misc:is_abnormal_termination(Reason)) of
+ true -> rabbit_log:warning("Dead queue lost ~p messages~n",
+ [length(MsgSeqNoAckTags)]);
+ false -> ok
+ end,
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = dict:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
-handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags1, UMQ3} =
- lists:foldl(
- fun (MsgSeqNo, {AckTags, UMQ1}) ->
- {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
- QPids1 = gb_sets:delete(QPid, QPids),
- case gb_sets:is_empty(QPids1) of
- true -> {[AckTag | AckTags],
- gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {AckTags, gb_trees:update(
- MsgSeqNo, {QPids1, AckTag}, UMQ1)}
- end
- end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
- MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
- gb_sets:from_list(MsgSeqNos)),
- State1 = case gb_sets:is_empty(MsgSeqNos1) of
- false -> State#q{
- unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> demonitor_queue(
- QPid, State#q{
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)})
- end,
- cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS1}).
-
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
- case {gb_trees:is_empty(UMQ), Reply} of
+stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+ case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
{stop, Reason, State};
{true, _} ->
@@ -853,16 +804,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
end.
-cleanup_after_confirm(State = #q{delayed_stop = DS,
- unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+ unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
{_, {_, noreply}} -> ok;
{_, {From, Reply}} -> gen_server2:reply(From, Reply)
end,
{Reason, _} = DS,
- {stop, Reason, State};
- false -> noreply(State)
+ {stop, Reason, State1};
+ false -> noreply(State1)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -1243,8 +1198,14 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
+handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
+ {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ State1 = case dtree:is_defined(QPid, UC1) of
+ false -> demonitor_queue(QPid, State);
+ true -> State
+ end,
+ cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ddf7f574..5b4eed37 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,8 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
- gb_trees_set_insert/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -176,7 +175,6 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
--spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -717,15 +715,6 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-gb_trees_set_insert(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} ->
- Values1 = gb_sets:insert(Value, Values),
- gb_trees:update(Key, Values1, Tree);
- none ->
- gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
- end.
-
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).