summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl31
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_amqqueue_process.erl160
-rw-r--r--src/rabbit_backing_queue.erl37
-rw-r--r--src/rabbit_tests.erl33
-rw-r--r--src/rabbit_variable_queue.erl133
6 files changed, 249 insertions, 165 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index b2bf6bbb..d9296bf6 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,23 +25,24 @@
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
--type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')).
--type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(),
+-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/3 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
--spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+-spec(publish/4 :: (rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
+-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
-> {ack(), state()};
(false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+ rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/2 ::
@@ -49,16 +50,17 @@
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
-spec(tx_commit/4 ::
(rabbit_types:txn(), fun (() -> any()),
message_properties_transformer(), state()) -> {[ack()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
- -> state()).
+ -> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
@@ -68,3 +70,8 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
+-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
+-spec(is_duplicate/3 ::
+ (rabbit_types:txn(), rabbit_types:basic_message(), state()) ->
+ {'false'|'published'|'discarded', state()}).
+-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965..804edc81 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,7 +30,7 @@
%% internal
-export([internal_declare/2, internal_delete/1,
- run_backing_queue/2, run_backing_queue_async/2,
+ run_backing_queue/3, run_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -141,10 +141,12 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(run_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue_async/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -439,11 +441,11 @@ internal_delete(QueueName) ->
end
end).
-run_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Fun}, infinity).
+run_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-run_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {run_backing_queue, Fun}).
+run_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2b0fe17e..110817a9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,8 +137,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -149,7 +148,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, QName, IsDurable, Recover),
+ BQS = bq_init(BQ, Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -159,17 +158,17 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
-bq_init(BQ, QName, IsDurable, Recover) ->
+bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(QName, IsDurable, Recover,
- fun (Fun) ->
- rabbit_amqqueue:run_backing_queue_async(Self, Fun)
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
end,
- fun (Fun) ->
+ fun (Mod, Fun) ->
rabbit_misc:with_exit_handler(
fun () -> error end,
fun () ->
- rabbit_amqqueue:run_backing_queue(Self, Fun)
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end)
end).
@@ -477,45 +476,70 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(#delivery{txn = none,
- sender = ChPid,
- message = Message,
- msg_seq_no = MsgSeqNo} = Delivery,
- State = #q{backing_queue = BQ}) ->
+attempt_delivery(Delivery = #delivery{txn = none,
+ sender = ChPid,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
- DeliverFun =
- fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
- %% we don't need an expiry here because messages are
- %% not being enqueued, so we use an empty
- %% message_properties.
- {AckTag, BQS1} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = needs_confirming(Confirm)},
- BQS),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS1}}
- end,
- {Delivered, State1} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, Confirm, State1};
-attempt_delivery(#delivery{txn = Txn,
- sender = ChPid,
- message = Message} = Delivery,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, should_confirm_message(Delivery, State),
- State#q{backing_queue_state = BQS1}}.
+ case BQ:is_duplicate(none, Message, BQS) of
+ {false, BQS1} ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false,
+ State1 = #q{backing_queue_state = BQS2}) ->
+ %% we don't need an expiry here because
+ %% messages are not being enqueued, so we use
+ %% an empty message_properties.
+ {AckTag, BQS3} =
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ ChPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end,
+ {Delivered, State2} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ State#q{backing_queue_state = BQS1}),
+ {Delivered, Confirm, State2};
+ {Duplicate, BQS1} ->
+ %% if the message has previously been seen by the BQ then
+ %% it must have been seen under the same circumstances as
+ %% now: i.e. if it is now a deliver_immediately then it
+ %% must have been before.
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ end;
+attempt_delivery(Delivery = #delivery{txn = Txn,
+ sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case BQ:is_duplicate(Txn, Message, BQS) of
+ {false, BQS1} ->
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
+ BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS1),
+ {true, Confirm, State#q{backing_queue_state = BQS2}};
+ {Duplicate, BQS1} ->
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ end.
-deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ sender = ChPid}, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
@@ -525,14 +549,17 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming = needs_confirming(Confirm)},
- BQS),
+ ChPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
run_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
- State).
+ BQ, fun (M, BQS) ->
+ {_MsgIds, BQS1} =
+ M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ BQS1
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -635,10 +662,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
+ run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
-run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -662,6 +690,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+discard_delivery(#delivery{sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
+
reset_msg_expiry_fun(TTL) ->
fun(MsgProps) ->
MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
@@ -768,11 +802,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {run_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -788,7 +822,7 @@ prioritise_cast(Msg, _State) ->
{reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
- {run_backing_queue, _Fun} -> 6;
+ {run_backing_queue, _Mod, _Fun} -> 6;
sync_timeout -> 6;
_ -> 0
end.
@@ -807,14 +841,14 @@ handle_call({init, Recover}, From,
true -> erlang:monitor(process, Owner),
declare(Recover, From, State);
false -> #q{backing_queue = BQ, backing_queue_state = undefined,
- q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = bq_init(BQ, QName, IsDurable, Recover),
+ BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
@@ -848,7 +882,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
- false -> State1
+ false -> discard_delivery(Delivery, State1)
end);
handle_call({deliver, Delivery}, From, State) ->
@@ -1004,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({run_backing_queue, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({run_backing_queue, Fun}, State) ->
- noreply(run_backing_queue(Fun, State));
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
@@ -1028,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- BQS1 = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1049,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0ca8d260..0955a080 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,19 +35,18 @@ behaviour_info(callbacks) ->
%% Initialise the backing queue and its state.
%%
%% Takes
- %% 1. the queue name
- %% 2. a boolean indicating whether the queue is durable
- %% 3. a boolean indicating whether the queue is an existing queue
+ %% 1. the amqqueue record
+ %% 2. a boolean indicating whether the queue is an existing queue
%% that should be recovered
- %% 4. an asynchronous callback which accepts a function of type
+ %% 3. an asynchronous callback which accepts a function of type
%% backing-queue-state to backing-queue-state. This callback
%% function can be safely invoked from any process, which
%% makes it useful for passing messages back into the backing
%% queue, especially as the backing queue does not have
%% control of its own mailbox.
- %% 5. a synchronous callback. Same as the asynchronous callback
+ %% 4. a synchronous callback. Same as the asynchronous callback
%% but waits for completion and returns 'error' on error.
- {init, 5},
+ {init, 4},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -61,12 +60,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Return ids of messages which have been confirmed since
%% the last invocation of this function (or initialisation).
@@ -109,7 +108,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -165,7 +164,25 @@ behaviour_info(callbacks) ->
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
- {status, 1}
+ {status, 1},
+
+ %% Passed a function to be invoked with the relevant backing
+ %% queue's state. Useful for when the backing queue or other
+ %% components need to pass functions into the backing queue.
+ {invoke, 3},
+
+ %% Called prior to a publish or publish_delivered call. Allows
+ %% the BQ to signal that it's already seen this message (and in
+ %% what capacity - i.e. was it published previously or discarded
+ %% previously) and thus the message should be dropped.
+ {is_duplicate, 3},
+
+ %% Called to inform the BQ about messages which have reached the
+ %% queue, but are not going to be further passed to BQ for some
+ %% reason. Note that this is may be invoked for messages for
+ %% which BQ:is_duplicate/2 has already returned {'published' |
+ %% 'discarded', BQS}.
+ {discard, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 294fae97..2ef07071 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2072,9 +2072,9 @@ test_queue_index() ->
passed.
-variable_queue_init(QName, IsDurable, Recover) ->
- rabbit_variable_queue:init(QName, IsDurable, Recover,
- fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+variable_queue_init(Q, Recover) ->
+ rabbit_variable_queue:init(
+ Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
@@ -2086,7 +2086,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2104,9 +2104,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = variable_queue_init(test_queue(), true, false),
+ VQ = variable_queue_init(test_amqqueue(true), false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2164,7 +2168,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -2208,7 +2212,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2218,7 +2222,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2252,7 +2256,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2281,7 +2285,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = variable_queue_init(test_queue(), true, true),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2294,17 +2298,18 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ {_Guids, VQ4} =
+ rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = variable_queue_init(test_queue(), true, true),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2328,7 +2333,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = variable_queue_init(QName, true, true),
+ VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff7252fd..7a3c17a2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,19 @@
-module(rabbit_variable_queue).
--export([init/5, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
- fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/4, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, multiple_routing_keys/0]).
+ status/1, invoke/3, is_duplicate/3, discard/3,
+ multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/7]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -408,15 +409,15 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
- init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
+init(Queue, Recover, AsyncCallback, SyncCallback) ->
+ init(Queue, Recover, AsyncCallback, SyncCallback,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
@@ -426,8 +427,8 @@ init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, AsyncCallback, SyncCallback,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = true }, true,
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -517,13 +518,14 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- State = #vqstate { async_callback = Callback, len = 0 }) ->
+ _ChPid, State = #vqstate { async_callback = Callback,
+ len = 0 }) ->
case NeedsConfirming of
true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
false -> ok
@@ -533,13 +535,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
@@ -665,13 +667,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State)).
+ {MsgIds, State1} = ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State),
+ {MsgIds, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of
@@ -727,7 +730,7 @@ requeue(AckTags, MsgPropsFun, State) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- a(reduce_memory_use(
+ {MsgIds, State1} =
ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
@@ -742,7 +745,8 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ {MsgIds, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -880,6 +884,13 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
+invoke(?MODULE, Fun, State) ->
+ Fun(?MODULE, State).
+
+is_duplicate(_Txn, _Msg, State) -> {false, State}.
+
+discard(_Msg, _ChPid, State) -> State.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -954,8 +965,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
+ rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -983,7 +994,7 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
State #vqstate { msg_store_clients = MSCState1 }
end.
@@ -1129,7 +1140,8 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
AsyncCallback, SyncCallback) ->
- case SyncCallback(fun (StateN) ->
+ case SyncCallback(?MODULE,
+ fun (?MODULE, StateN) ->
tx_commit_post_msg_store(true, Pubs, AckTags,
Fun, MsgPropsFun, StateN)
end) of
@@ -1192,20 +1204,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ {_MsgIds, State1} = ack(Acks, State),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProps},
- {SeqIdsAcc, State2}) ->
+ {SeqIdsAcc, State3}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
- publish(Msg, MsgProps, false, IsPersistent1, State2),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ {SeqId, State4} =
+ publish(Msg, MsgProps, false, IsPersistent1, State3),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
+ end, {PAcks, State1}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+ State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1352,7 +1365,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore} =
+ {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1371,9 +1384,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore},
+ {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1393,21 +1406,24 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ {lists:reverse(AllMsgIds),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new()}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false },
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore};
+ index_on_disk = false,
+ msg_id = MsgId },
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ [MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1451,14 +1467,16 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
blind_confirm(Callback, MsgIdSet) ->
- Callback(fun (State) -> record_confirms(MsgIdSet, State) end).
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
msgs_written_to_disk(Callback, MsgIdSet, removed) ->
blind_confirm(Callback, MsgIdSet);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
- Callback(fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
State #vqstate {
@@ -1467,9 +1485,10 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
end).
msg_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
Confirmed = gb_sets:intersection(UC, MsgIdSet),
record_confirms(gb_sets:intersection(MsgIdSet, MOD),
State #vqstate {