summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-04-24 13:00:04 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-04-24 13:00:04 +0100
commit4aea09ab8900c2be0b21a17214a2209121b4549b (patch)
tree6b63b270e96d802ec97a268f0c4c07d49802fa44 /src
parentf31f2cfbdf1c6c2f1895110c4fb22835a0ea276b (diff)
parentafe952206ba04359c334337321ada308fc7280a8 (diff)
downloadrabbitmq-server-4aea09ab8900c2be0b21a17214a2209121b4549b.tar.gz
Merge bug24848
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl119
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_backing_queue_qc.erl7
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_channel.erl41
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl17
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl39
10 files changed, 147 insertions, 116 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3caf728b..5701efeb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ }) ->
Now = now_micros(),
- BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_fun(expired, State),
- BQS),
+ DLXFun = dead_letter_fun(expired, State),
+ ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ case DLXFun of
+ undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
+ BQS1;
+ _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(
+ fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
+ BQS1
+ end,
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -717,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1};
+ack_if_no_dlx(_AckTags, State) ->
+ State.
+
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
@@ -724,22 +738,25 @@ dead_letter_fun(Reason, _State) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
- case rabbit_exchange:lookup(DLX) of
- {error, not_found} -> noreply(State);
- _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State)
+dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = #basic_message{exchange_name = XName} =
+ make_dead_letter_msg(Reason, Msg, State),
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids;
+ {error, not_found} ->
+ []
end.
-dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
- {ok, _, QPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC}) ->
+ QPids = dead_letter_publish(Msg, Reason, State),
State1 = State#q{queue_monitors = pmon:monitor_all(
QPids, State#q.queue_monitors),
publish_seqno = MsgSeqNo + 1},
@@ -797,56 +814,58 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-already_been_here(_Delivery, #q{dlx = undefined}) ->
- false;
-already_been_here(#delivery{message = #basic_message{content = Content}},
- State) ->
+detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
- #resource{name = QueueName} = qname(State),
+ NoCycles = {Queues, []},
case Headers of
undefined ->
- false;
+ NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
{array, DeathTables} ->
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
{table, D} <- DeathTables],
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- case lists:member(QueueName, OldQueues1) of
- true -> [QueueName | OldQueues1];
- _ -> false
- end;
+ OldQueuesSet = ordsets:from_list(OldQueues1),
+ {Cycling, NotCycling} =
+ lists:partition(
+ fun(Queue) ->
+ ordsets:is_element(Queue#resource.name,
+ OldQueuesSet)
+ end, Queues),
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
_ ->
- false
+ NoCycles
end
end.
-make_dead_letter_msg(DLX, Reason,
+make_dead_letter_msg(Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
{DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
#resource{name = QName} = qname(State),
+ TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 =
- [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>,
- longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}],
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
Info, Headers))
end,
@@ -1196,8 +1215,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
Senders1 = case Flow of
@@ -1206,12 +1224,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- case already_been_here(Delivery, State1) of
- false -> noreply(deliver_or_enqueue(Delivery, State1));
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State1)
- end;
+ noreply(deliver_or_enqueue(Delivery, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
@@ -1227,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
ChPid, AckTags, State,
case Requeue of
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> Fun = dead_letter_fun(rejected, State),
- fun (State1 = #q{backing_queue = BQ,
+ false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ Fun = dead_letter_fun(rejected, State1),
BQS1 = BQ:fold(Fun, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
+ ack_if_no_dlx(
+ AckTags,
+ State1#q{backing_queue_state = BQS1})
end
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6cc1c3fd..28c57bb0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,6 +35,7 @@
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
'undefined').
+-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
@@ -117,12 +118,14 @@
%% be ignored.
-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
-%% Drop messages from the head of the queue while the supplied
-%% predicate returns true. A callback function is supplied allowing
-%% callers access to messages that are about to be dropped.
--callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state().
+%% Drop messages from the head of the queue while the supplied predicate returns
+%% true. Also accepts a boolean parameter that determines whether the messages
+%% necessitate an ack or not. If they do, the function returns a list of
+%% messages with the respective acktags.
+-callback dropwhile(msg_pred(), true, state())
+ -> {[{rabbit_types:basic_message(), ack()}], state()};
+ (msg_pred(), false, state())
+ -> {undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 286b69e4..a84800c0 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1};
-next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
+ S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 8ad59016..17d848da 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -63,7 +63,7 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
+-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
-> rabbit_types:content()).
-spec(header_routes/1 ::
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 846890a1..22c6a223 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,9 +36,9 @@
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed,
- confirmed, capabilities, trace_state}).
+ consumer_mapping, blocking, queue_consumers, delivering_queues,
+ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
+ unconfirmed, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
+ delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors = pmon:erase(
- QPid, State3#ch.queue_monitors)});
+ QPid, State4#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, record_sent(none, not(NoAck), Msg, State)};
+ State1 = monitor_delivering_queue(NoAck, QPid, State),
+ {noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q} ->
- State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag, Q,
- ConsumerMapping)},
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
State
end.
+monitor_delivering_queue(true, _QPid, State) ->
+ State;
+monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = sets:add_element(QPid, DQ)}.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
State#ch{consumer_mapping = ConsumerMapping1,
queue_consumers = dict:erase(QPid, QCons)}.
+handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
notify_queues(State = #ch{state = closing}) ->
{ok, State};
-notify_queues(State = #ch{consumer_mapping = Consumers}) ->
- {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
- State#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers,
+ delivering_queues = DQ }) ->
+ QPids = sets:to_list(
+ sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
fold_per_queue(_F, Acc, []) ->
Acc;
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2d155d14..17e2ffb4 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
-handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
ensure_gm_heartbeat(),
noreply(State);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 04b7514f..551fdf18 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun,
+dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
+ {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }.
+ {Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS}, AckTags) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
- State #state { backing_queue_state = BQS1 }.
+fold(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4b095209..a7a1273d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -835,11 +835,6 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({fold, MsgFun, AckTags},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c74b8d5f..04ee6ef2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, undefined, VQ1),
+ {undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2408,11 +2408,13 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, undefined, VQ2),
+ {undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
+ {undefined, VQ6} =
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0bfec2fd..209e5252 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,13 +16,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, fold/3]).
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
+ timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+
+dropwhile(Pred, AckRequired, State, Msgs) ->
+ End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
+ (S) -> {undefined, S}
+ end,
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), MsgFun} of
- {true, undefined} ->
+ case {Pred(MsgProps), AckRequired} of
+ {true, true} ->
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, _, AckTag, _}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
+ {true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, MsgFun, State2);
- {true, _} ->
- {{_, _, AckTag, _}, State2} =
- internal_fetch(true, MsgStatus, State1),
- {MsgStatus, State3} = read_msg(MsgStatus, State2),
- MsgFun(MsgStatus#msg_status.msg, AckTag),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- a(in_r(MsgStatus, State1))
+ End(a(in_r(MsgStatus, State1)))
end
end.