summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-10-20 16:22:35 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-10-20 16:22:35 +0100
commitaade042ed7a485c21481bee2ffaf7652652711a2 (patch)
tree85a60f29187e5b02d49ee48753abf5e14f2c0751
parent54e0b5f85c133dc164a4c927c42a278aabd7dbfa (diff)
downloadrabbitmq-server-aade042ed7a485c21481bee2ffaf7652652711a2.tar.gz
Refrain from resetting message expiry in message requeue
-rw-r--r--include/rabbit_backing_queue_spec.hrl5
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl47
7 files changed, 36 insertions, 50 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 20fe4234..4a657951 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -22,9 +22,6 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(confirm_required() :: boolean()).
--type(message_properties_transformer() ::
- fun ((rabbit_types:message_properties())
- -> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
@@ -51,7 +48,7 @@
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
+-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 46f6674b..62ccbcb0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -554,9 +554,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
run_backing_queue(
BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} =
- M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- BQS1
+ {_MsgIds, BQS1} = M:requeue(AckTags, BQS), BQS1
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -670,11 +668,6 @@ discard_delivery(#delivery{sender = ChPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
-reset_msg_expiry_fun(TTL) ->
- fun(MsgProps) ->
- MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
- end.
-
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 77278416..c3b322ee 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -107,7 +107,7 @@ behaviour_info(callbacks) ->
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 3},
+ {requeue, 2},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 328fe639..f60562ef 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -248,11 +248,11 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
- ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
+requeue(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f423760a..7182042d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -827,14 +827,14 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgPropsFun, MsgIds},
+process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 };
false ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3a4f6f84..fcfd5557 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2238,11 +2238,10 @@ test_variable_queue_requeue(VQ0) ->
(_, Acc) ->
Acc
end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
- fun(X) -> X end, VQ3),
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
VQ5 = lists:foldl(fun (AckTag, VQN) ->
{_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], fun(X) -> X end, VQN),
+ [AckTag], VQN),
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
@@ -2426,7 +2425,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
- rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 895fc388..1de7be32 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
+ dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -628,21 +628,19 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+requeue(AckTags, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
- fun publish_alpha/2,
- MsgPropsFun, State),
+ fun publish_alpha/2, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- fun publish_beta/2,
- MsgPropsFun, State1),
+ fun publish_beta/2, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- MsgPropsFun, State2),
+ State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
State3 #vqstate { delta = Delta1,
@@ -1335,50 +1333,49 @@ publish_beta(MsgStatus, State) ->
ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
- Limit, PubFun, MsgPropsFun, State).
+ Limit, PubFun, State).
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
- Limit, PubFun, MsgPropsFun, State)
+ Limit, PubFun, State)
when Limit == undefined orelse SeqId < Limit ->
case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
- Limit, PubFun, MsgPropsFun, State);
+ Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
- State),
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, MsgPropsFun, State2)
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
- _Limit, _PubFun, _MsgPropsFun, State) ->
+ _Limit, _PubFun, State) ->
{SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
-delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
-delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
+delta_merge(SeqIds, Delta, MsgIds, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
remove_pending_ack(SeqId, State),
{MsgStatus #msg_status {
- msg_props = (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false } }, State1}.
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}.
beta_limit(Q) ->
case ?QUEUE:peek(Q) of