diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-04-24 13:00:04 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-04-24 13:00:04 +0100 |
commit | 4aea09ab8900c2be0b21a17214a2209121b4549b (patch) | |
tree | 6b63b270e96d802ec97a268f0c4c07d49802fa44 /src | |
parent | f31f2cfbdf1c6c2f1895110c4fb22835a0ea276b (diff) | |
parent | afe952206ba04359c334337321ada308fc7280a8 (diff) | |
download | rabbitmq-server-4aea09ab8900c2be0b21a17214a2209121b4549b.tar.gz |
Merge bug24848
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 119 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 17 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
10 files changed, 147 insertions, 116 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3caf728b..5701efeb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ }) -> Now = now_micros(), - BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_fun(expired, State), - BQS), + DLXFun = dead_letter_fun(expired, State), + ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + case DLXFun of + undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), + BQS1; + _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach( + fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + BQS1 + end, ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -717,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -724,22 +738,25 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> - {ok, _, QPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> + QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), publish_seqno = MsgSeqNo + 1}, @@ -797,56 +814,58 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -already_been_here(_Delivery, #q{dlx = undefined}) -> - false; -already_been_here(#delivery{message = #basic_message{content = Content}}, - State) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), - #resource{name = QueueName} = qname(State), + NoCycles = {Queues, []}, case Headers of undefined -> - false; + NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of {array, DeathTables} -> OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || {table, D} <- DeathTables], OldQueues1 = [QName || {longstr, QName} <- OldQueues], - case lists:member(QueueName, OldQueues1) of - true -> [QueueName | OldQueues1]; - _ -> false - end; + OldQueuesSet = ordsets:from_list(OldQueues1), + {Cycling, NotCycling} = + lists:partition( + fun(Queue) -> + ordsets:is_element(Queue#resource.name, + OldQueuesSet) + end, Queues), + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; _ -> - false + NoCycles end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1196,8 +1215,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. Senders1 = case Flow of @@ -1206,12 +1224,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, noflow -> Senders end, State1 = State#q{senders = Senders1}, - case already_been_here(Delivery, State1) of - false -> noreply(deliver_or_enqueue(Delivery, State1)); - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State1) - end; + noreply(deliver_or_enqueue(Delivery, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( @@ -1227,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6cc1c3fd..28c57bb0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,6 +35,7 @@ -type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | 'undefined'). +-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the @@ -117,12 +118,14 @@ %% be ignored. -callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. -%% Drop messages from the head of the queue while the supplied -%% predicate returns true. A callback function is supplied allowing -%% callers access to messages that are about to be dropped. --callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state(). +%% Drop messages from the head of the queue while the supplied predicate returns +%% true. Also accepts a boolean parameter that determines whether the messages +%% necessitate an ack or not. If they do, the function returns a list of +%% messages with the respective acktags. +-callback dropwhile(msg_pred(), true, state()) + -> {[{rabbit_types:basic_message(), ack()}], state()}; + (msg_pred(), false, state()) + -> {undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 286b69e4..a84800c0 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1}; -next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> +next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), - S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1}; + S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> S; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8ad59016..17d848da 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -63,7 +63,7 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) +-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) -> rabbit_types:content()). -spec(header_routes/1 :: diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 846890a1..22c6a223 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,9 +36,9 @@ conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed, - confirmed, capabilities, trace_state}). + consumer_mapping, blocking, queue_consumers, delivering_queues, + queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), + delivering_queues = sets:new(), queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, @@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = pmon:erase( - QPid, State3#ch.queue_monitors)}); + QPid, State4#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, record_sent(none, not(NoAck), Msg, State)}; + State1 = monitor_delivering_queue(NoAck, QPid, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q} -> - State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, Q, - ConsumerMapping)}, + {ok, Q = #amqqueue{pid = QPid}} -> + CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag, State end. +monitor_delivering_queue(true, _QPid, State) -> + State; +monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = sets:add_element(QPid, DQ)}. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid, State#ch{consumer_mapping = ConsumerMapping1, queue_consumers = dict:erase(QPid, QCons)}. +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), notify_queues(State = #ch{state = closing}) -> {ok, State}; -notify_queues(State = #ch{consumer_mapping = Consumers}) -> - {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), - State#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. fold_per_queue(_F, Acc, []) -> Acc; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2d155d14..17e2ffb4 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). -handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> +handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), ensure_gm_heartbeat(), noreply(State); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 04b7514f..551fdf18 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, MsgFun, +dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 }. + {Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS}, AckTags) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), - State #state { backing_queue_state = BQS1 }. +fold(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4b095209..a7a1273d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -835,11 +835,6 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({fold, MsgFun, AckTags}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c74b8d5f..04ee6ef2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, undefined, VQ1), + {undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2408,11 +2408,13 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, undefined, VQ2), + {undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). + {undefined, VQ6} = + rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0bfec2fd..209e5252 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,13 +16,12 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, drain_confirmed/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, fold/3]). + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, + timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, State) -> +dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). + +dropwhile(Pred, AckRequired, State, Msgs) -> + End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; + (S) -> {undefined, S} + end, case queue_out(State) of {empty, State1} -> - a(State1); + End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), MsgFun} of - {true, undefined} -> + case {Pred(MsgProps), AckRequired} of + {true, true} -> + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, _, AckTag, _}, State3} = + internal_fetch(true, MsgStatus1, State2), + dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); + {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, MsgFun, State2); - {true, _} -> - {{_, _, AckTag, _}, State2} = - internal_fetch(true, MsgStatus, State1), - {MsgStatus, State3} = read_msg(MsgStatus, State2), - MsgFun(MsgStatus#msg_status.msg, AckTag), - dropwhile(Pred, MsgFun, State3); + dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - a(in_r(MsgStatus, State1)) + End(a(in_r(MsgStatus, State1))) end end. |