diff options
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 31 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 160 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 37 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 133 |
6 files changed, 249 insertions, 165 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index b2bf6bbb..d9296bf6 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,23 +25,24 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). --type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')). --type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(), +-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). --spec(publish_delivered/4 :: (true, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) +-spec(publish/4 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). +-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> {ack(), state()}; (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) + rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: @@ -49,16 +50,17 @@ -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: (rabbit_types:txn(), fun (() -> any()), message_properties_transformer(), state()) -> {[ack()], state()}). -spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) - -> state()). + -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: @@ -68,3 +70,8 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). +-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). +-spec(is_duplicate/3 :: + (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> + {'false'|'published'|'discarded', state()}). +-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965..804edc81 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,7 @@ %% internal -export([internal_declare/2, internal_delete/1, - run_backing_queue/2, run_backing_queue_async/2, + run_backing_queue/3, run_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -141,10 +141,12 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(run_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue_async/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -439,11 +441,11 @@ internal_delete(QueueName) -> end end). -run_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). -run_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {run_backing_queue, Fun}). +run_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0fe17e..110817a9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,8 +137,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -149,7 +148,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -159,17 +158,17 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. -bq_init(BQ, QName, IsDurable, Recover) -> +bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(QName, IsDurable, Recover, - fun (Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Fun) + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) end, - fun (Fun) -> + fun (Mod, Fun) -> rabbit_misc:with_exit_handler( fun () -> error end, fun () -> - rabbit_amqqueue:run_backing_queue(Self, Fun) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end) end). @@ -477,45 +476,70 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(#delivery{txn = none, - sender = ChPid, - message = Message, - msg_seq_no = MsgSeqNo} = Delivery, - State = #q{backing_queue = BQ}) -> +attempt_delivery(Delivery = #delivery{txn = none, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, - DeliverFun = - fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> - %% we don't need an expiry here because messages are - %% not being enqueued, so we use an empty - %% message_properties. - {AckTag, BQS1} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - BQS), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS1}} - end, - {Delivered, State1} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, Confirm, State1}; -attempt_delivery(#delivery{txn = Txn, - sender = ChPid, - message = Message} = Delivery, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, should_confirm_message(Delivery, State), - State#q{backing_queue_state = BQS1}}. + case BQ:is_duplicate(none, Message, BQS) of + {false, BQS1} -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, + State1 = #q{backing_queue_state = BQS2}) -> + %% we don't need an expiry here because + %% messages are not being enqueued, so we use + %% an empty message_properties. + {AckTag, BQS3} = + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = needs_confirming(Confirm)}, + ChPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, + {Delivered, State2} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + State#q{backing_queue_state = BQS1}), + {Delivered, Confirm, State2}; + {Duplicate, BQS1} -> + %% if the message has previously been seen by the BQ then + %% it must have been seen under the same circumstances as + %% now: i.e. if it is now a deliver_immediately then it + %% must have been before. + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end; +attempt_delivery(Delivery = #delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Confirm = should_confirm_message(Delivery, State), + case BQ:is_duplicate(Txn, Message, BQS) of + {false, BQS1} -> + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), + BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS1), + {true, Confirm, State#q{backing_queue_state = BQS2}}; + {Duplicate, BQS1} -> + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end. -deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = ChPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -525,14 +549,17 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS), + ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> run_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, - State). + BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = + M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + BQS1 + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -635,10 +662,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State). + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). -run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). +run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -662,6 +690,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +discard_delivery(#delivery{sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} @@ -768,11 +802,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {run_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -788,7 +822,7 @@ prioritise_cast(Msg, _State) -> {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; - {run_backing_queue, _Fun} -> 6; + {run_backing_queue, _Mod, _Fun} -> 6; sync_timeout -> 6; _ -> 0 end. @@ -807,14 +841,14 @@ handle_call({init, Recover}, From, true -> erlang:monitor(process, Owner), declare(Recover, From, State); false -> #q{backing_queue = BQ, backing_queue_state = undefined, - q = #amqqueue{name = QName, durable = IsDurable}} = State, + q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; @@ -848,7 +882,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); - false -> State1 + false -> discard_delivery(Delivery, State1) end); handle_call({deliver, Delivery}, From, State) -> @@ -1004,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({run_backing_queue, Fun}, _From, State) -> - reply(ok, run_backing_queue(Fun, State)). +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({run_backing_queue, Fun}, State) -> - noreply(run_backing_queue(Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); @@ -1028,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - BQS1 = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1049,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0ca8d260..0955a080 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,19 +35,18 @@ behaviour_info(callbacks) -> %% Initialise the backing queue and its state. %% %% Takes - %% 1. the queue name - %% 2. a boolean indicating whether the queue is durable - %% 3. a boolean indicating whether the queue is an existing queue + %% 1. the amqqueue record + %% 2. a boolean indicating whether the queue is an existing queue %% that should be recovered - %% 4. an asynchronous callback which accepts a function of type + %% 3. an asynchronous callback which accepts a function of type %% backing-queue-state to backing-queue-state. This callback %% function can be safely invoked from any process, which %% makes it useful for passing messages back into the backing %% queue, especially as the backing queue does not have %% control of its own mailbox. - %% 5. a synchronous callback. Same as the asynchronous callback + %% 4. a synchronous callback. Same as the asynchronous callback %% but waits for completion and returns 'error' on error. - {init, 5}, + {init, 4}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -61,12 +60,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Return ids of messages which have been confirmed since %% the last invocation of this function (or initialisation). @@ -109,7 +108,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -165,7 +164,25 @@ behaviour_info(callbacks) -> %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status - {status, 1} + {status, 1}, + + %% Passed a function to be invoked with the relevant backing + %% queue's state. Useful for when the backing queue or other + %% components need to pass functions into the backing queue. + {invoke, 3}, + + %% Called prior to a publish or publish_delivered call. Allows + %% the BQ to signal that it's already seen this message (and in + %% what capacity - i.e. was it published previously or discarded + %% previously) and thus the message should be dropped. + {is_duplicate, 3}, + + %% Called to inform the BQ about messages which have reached the + %% queue, but are not going to be further passed to BQ for some + %% reason. Note that this is may be invoked for messages for + %% which BQ:is_duplicate/2 has already returned {'published' | + %% 'discarded', BQS}. + {discard, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 294fae97..2ef07071 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2072,9 +2072,9 @@ test_queue_index() -> passed. -variable_queue_init(QName, IsDurable, Recover) -> - rabbit_variable_queue:init(QName, IsDurable, Recover, - fun nop/1, fun nop/1, fun nop/2, fun nop/1). +variable_queue_init(Q, Recover) -> + rabbit_variable_queue:init( + Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( @@ -2086,7 +2086,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2104,9 +2104,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = variable_queue_init(test_queue(), true, false), + VQ = variable_queue_init(test_amqqueue(true), false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2164,7 +2168,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -2208,7 +2212,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2218,7 +2222,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2252,7 +2256,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2281,7 +2285,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2294,17 +2298,18 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2328,7 +2333,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = variable_queue_init(QName, true, true), + VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff7252fd..7a3c17a2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,19 @@ -module(rabbit_variable_queue). --export([init/5, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, drain_confirmed/1, - fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/4, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, drain_confirmed/1, + fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, multiple_routing_keys/0]). + status/1, invoke/3, is_duplicate/3, discard/3, + multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/7]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -408,15 +409,15 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> - init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback, SyncCallback) -> + init(Queue, Recover, AsyncCallback, SyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of @@ -426,8 +427,8 @@ init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = true }, true, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -517,13 +518,14 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { async_callback = Callback, len = 0 }) -> + _ChPid, State = #vqstate { async_callback = Callback, + len = 0 }) -> case NeedsConfirming of true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); false -> ok @@ -533,13 +535,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, @@ -665,13 +667,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State)). + {MsgIds, State1} = ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State), + {MsgIds, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of @@ -727,7 +730,7 @@ requeue(AckTags, MsgPropsFun, State) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - a(reduce_memory_use( + {MsgIds, State1} = ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), @@ -742,7 +745,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + {MsgIds, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -880,6 +884,13 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State). + +is_duplicate(_Txn, _Msg, State) -> {false, State}. + +discard(_Msg, _ChPid, State) -> State. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -954,8 +965,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end). + rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -983,7 +994,7 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - fun (State = #vqstate { msg_store_clients = MSCState }) -> + fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), State #vqstate { msg_store_clients = MSCState1 } end. @@ -1129,7 +1140,8 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> - case SyncCallback(fun (StateN) -> + case SyncCallback(?MODULE, + fun (?MODULE, StateN) -> tx_commit_post_msg_store(true, Pubs, AckTags, Fun, MsgPropsFun, StateN) end) of @@ -1192,20 +1204,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {SeqIds, State1 = #vqstate { index_state = IndexState }} = + {_MsgIds, State1} = ack(Acks, State), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProps}, - {SeqIdsAcc, State2}) -> + {SeqIdsAcc, State3}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = - publish(Msg, MsgProps, false, IsPersistent1, State2), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + {SeqId, State4} = + publish(Msg, MsgProps, false, IsPersistent1, State3), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} + end, {PAcks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1352,7 +1365,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore} = + {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1371,9 +1384,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore}, + {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1393,21 +1406,24 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + {lists:reverse(AllMsgIds), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new()}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> - {PersistentSeqIdsAcc, MsgIdsByStore}; + index_on_disk = false, + msg_id = MsgId }, + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1451,14 +1467,16 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). blind_confirm(Callback, MsgIdSet) -> - Callback(fun (State) -> record_confirms(MsgIdSet, State) end). + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). msgs_written_to_disk(Callback, MsgIdSet, removed) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MIOD), State #vqstate { @@ -1467,9 +1485,10 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> end). msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MOD), State #vqstate { |