diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-12-04 14:00:12 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-12-04 14:00:12 +0000 |
commit | db9999bd68df6cbd541b5c43167b13db47b9f2fd (patch) | |
tree | 80bb31b61a80c30e2bd18ea9c55eafb69053eefb | |
parent | 5c30e7cf1fd1502ed5b30da8a6617705f27cd34f (diff) | |
parent | 89f60604808dbbf7e0d6ea88192bf68e4cafe113 (diff) | |
download | rabbitmq-server-db9999bd68df6cbd541b5c43167b13db47b9f2fd.tar.gz |
stable to default
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 103 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 59 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 112 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 148 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 22 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 134 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 25 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 168 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 187 |
12 files changed, 607 insertions, 388 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b2832b45..0ccb80bf 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -78,8 +78,7 @@ -record(event, {type, props, timestamp}). --record(message_properties, {expiry, needs_confirming = false, - delivered = false}). +-record(message_properties, {expiry, needs_confirming = false}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit.erl b/src/rabbit.erl index c3a6d283..7b8348fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -400,13 +400,11 @@ status() -> is_running() -> is_running(node()). -is_running(Node) -> - rabbit_nodes:is_running(Node, rabbit). +is_running(Node) -> rabbit_nodes:is_running(Node, rabbit). environment() -> - lists:keysort( - 1, [P || P = {K, _} <- application:get_all_env(rabbit), - K =/= default_pass]). + lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit), + K =/= default_pass]). rotate_logs(BinarySuffix) -> Suffix = binary_to_list(BinarySuffix), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 173f7648..1b6cc223 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start_mirroring/1, stop_mirroring/1]). %% internal --export([internal_declare/2, internal_delete/2, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -156,11 +156,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/2 :: - (name(), pid()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName, QPid), + false -> TailFun = internal_delete(QueueName), fun () -> TailFun(), ExistingQ end end end @@ -569,7 +569,7 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName, QPid) -> +internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of @@ -579,8 +579,7 @@ internal_delete(QueueName, QPid) -> fun() -> ok = T(), ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QueueName}]) + [{name, QueueName}]) end end end). @@ -600,7 +599,7 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = - qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + qlc:e(qlc:q([{QName, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), @@ -613,10 +612,9 @@ on_node_down(Node) -> fun () -> T(), lists:foreach( - fun({QName, QPid}) -> + fun(QName) -> ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QName}]) + [{name, QName}]) end, Qs) end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f4459e45..2ffa2a1a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,7 +85,7 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [pid, + [name, policy, exclusive_consumer_pid, exclusive_consumer_tag, @@ -101,16 +101,14 @@ ]). -define(CREATION_EVENT_KEYS, - [pid, - name, + [name, durable, auto_delete, arguments, owner_pid ]). --define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]). %%---------------------------------------------------------------------------- @@ -182,7 +180,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName, self()), + rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -278,7 +276,8 @@ terminate_shutdown(Fun, State) -> case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - [emit_consumer_deleted(Ch, CTag) + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -486,11 +485,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = - fetch(AckRequired, State), + {Result, State1} = fetch(AckRequired, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {Result, BQ:is_empty(BQS), State2}. confirm_messages([], State) -> State; @@ -543,8 +541,8 @@ run_message_queue(State) -> State2. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, - Props = #message_properties{delivered = Delivered}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Props, Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( @@ -566,15 +564,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, State1) of + Props = message_properties(Message, Confirm, State), + case attempt_delivery(Delivery, Props, Delivered, State1) of {true, State2} -> State2; %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. @@ -606,9 +604,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). -remove_consumers(ChPid, Queue) -> +remove_consumers(ChPid, Queue, QName) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag), + emit_consumer_deleted(ChPid, CTag, QName), false; (_) -> true @@ -649,7 +647,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> - _ = remove_consumers(ChPid, Blocked), %% for stats emission + QName = qname(State), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -657,7 +656,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State#q.active_consumers, + QName), senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; @@ -705,10 +705,9 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) -> +message_properties(Message, Confirm, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually, - delivered = Delivered}. + needs_confirming = Confirm == eventually}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = @@ -726,13 +725,16 @@ drop_expired_messages(State = #q{dlx = DLX, Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, BQS1} = case DLX of - undefined -> {Next, undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> {Next, Msgs, BQS2} = - BQ:dropwhile(ExpirePred, true, BQS), - DLXFun = dead_letter_fun(expired), - DLXFun(Msgs), + undefined -> BQ:dropwhile(ExpirePred, BQS); + _ -> {Next, Msgs, BQS2} = + BQ:fetchwhile(ExpirePred, + fun accumulate_msgs/4, + [], BQS), + case Msgs of + [] -> ok; + _ -> (dead_letter_fun(expired))( + lists:reverse(Msgs)) + end, {Next, BQS2} end, ensure_ttl_timer(case Props of @@ -740,6 +742,8 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). +accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. + ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> @@ -954,19 +958,19 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> rabbit_event:notify(consumer_created, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). -emit_consumer_deleted(ChPid, ConsumerTag) -> +emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). %%---------------------------------------------------------------------------- @@ -1060,8 +1064,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag, Remaining}, State2} -> - State3 = + {{Message, IsDelivered, AckTag}, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), ChAckTags1 = sets:add_element(AckTag, ChAckTags), @@ -1070,14 +1074,13 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = ExistingHolder}) -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of + _From, State = #q{exclusive_consumer = Holder}) -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> @@ -1086,7 +1089,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder + true -> Holder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, @@ -1101,7 +1104,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck), + not NoAck, qname(State2)), reply(ok, State2) end; @@ -1112,7 +1115,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, not_found -> reply(ok, State); C = #cr{blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag), + emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), State1 = State#q{ @@ -1122,7 +1125,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end, active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers)}, + State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(From, ok, State1) @@ -1136,11 +1139,11 @@ handle_call(stat, _From, State) -> handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - IsEmpty = BQ:is_empty(BQS), + IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if - IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); - IfUnused and not(IsUnused) -> reply({error, in_use}, State); + IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); + IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> stop(From, {ok, BQ:len(BQS)}, State) end; @@ -1156,11 +1159,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + QName = qname(State), case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName) || {Ch, CTag, AckRequired} <- consumers(State)]; {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) + emit_consumer_created(Ch, CTag, true, AckRequired, QName) end, reply(ok, State). @@ -1208,7 +1213,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, + BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index af660c60..272df5c1 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,9 +26,9 @@ -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). +-type(drop_result(Ack) :: + ('empty' | {rabbit_types:msg_id(), Ack})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: @@ -78,8 +78,8 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state(). + rabbit_types:message_properties(), boolean(), pid(), + state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -124,33 +124,53 @@ %% be ignored. -callback drain_confirmed(state()) -> {msg_ids(), state()}. -%% Drop messages from the head of the queue while the supplied predicate returns -%% true. Also accepts a boolean parameter that determines whether the messages -%% necessitate an ack or not. If they do, the function returns a list of -%% messages with the respective acktags. --callback dropwhile(msg_pred(), true, state()) - -> {rabbit_types:message_properties() | undefined, - [{rabbit_types:basic_message(), ack()}], state()}; - (msg_pred(), false, state()) - -> {rabbit_types:message_properties() | undefined, - undefined, state()}. +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predictate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their Delivered +%% flag and ack tag, to the supplied function. The function is also +%% fed an accumulator. The result of fetchwhile is as for dropwhile +%% plus the accumulator. +-callback fetchwhile(msg_pred(), + fun ((rabbit_types:basic_message(), boolean(), ack(), A) + -> A), + A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}. +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. -callback ack([ack()], state()) -> {msg_ids(), state()}. %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. --callback fold(msg_fun(), state(), [ack()]) -> state(). +-callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), + rabbit_types:message_properties(), + A) -> {('stop' | 'cont'), A}), + A, state()) -> {A, state()}. + %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -210,9 +230,10 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {dropwhile, 2}, {fetchwhile, 4}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index b37fbb29..5b3b8aa8 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -85,17 +85,19 @@ backing_queue_test(Cmds) -> %% Commands -%% Command frequencies are tuned so that queues are normally reasonably -%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple -%% and purging cause extreme queue lengths, so these have lower probabilities. -%% Fetches are sufficiently frequent so that commands that need acktags -%% get decent coverage. +%% Command frequencies are tuned so that queues are normally +%% reasonably short, but they may sometimes exceed +%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue +%% lengths, so these have lower probabilities. Fetches/drops are +%% sufficiently frequent so that commands that need acktags get decent +%% coverage. command(S) -> frequency([{10, qc_publish(S)}, {1, qc_publish_delivered(S)}, {1, qc_publish_multiple(S)}, %% very slow - {15, qc_fetch(S)}, %% needed for ack and requeue + {9, qc_fetch(S)}, %% needed for ack and requeue + {6, qc_drop(S)}, %% {15, qc_ack(S)}, {15, qc_requeue(S)}, {3, qc_set_ram_duration_target(S)}, @@ -104,7 +106,8 @@ command(S) -> {1, qc_dropwhile(S)}, {1, qc_is_empty(S)}, {1, qc_timeout(S)}, - {1, qc_purge(S)}]). + {1, qc_purge(S)}, + {1, qc_fold(S)}]). qc_publish(#state{bqstate = BQ}) -> {call, ?BQMOD, publish, @@ -112,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) -> #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), expiry = oneof([undefined | lists:seq(1, 10)])}, - self(), BQ]}. + false, self(), BQ]}. qc_publish_multiple(#state{}) -> {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. @@ -124,6 +127,9 @@ qc_publish_delivered(#state{bqstate = BQ}) -> qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. +qc_drop(#state{bqstate = BQ}) -> + {call, ?BQMOD, drop, [boolean(), BQ]}. + qc_ack(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. @@ -141,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -152,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) -> qc_purge(#state{bqstate = BQ}) -> {call, ?BQMOD, purge, [BQ]}. +qc_fold(#state{bqstate = BQ}) -> + {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}. + %% Preconditions %% Create long queues by only allowing publishing @@ -173,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> %% Model updates -next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> +next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) -> #state{len = Len, messages = Messages, confirms = Confirms, @@ -217,22 +226,10 @@ next_state(S, Res, }; next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> - #state{len = Len, messages = Messages, acks = Acks} = S, - ResultInfo = {call, erlang, element, [1, Res]}, - BQ1 = {call, erlang, element, [2, Res]}, - AckTag = {call, erlang, element, [3, ResultInfo]}, - S1 = S#state{bqstate = BQ1}, - case gb_trees:is_empty(Messages) of - true -> S1; - false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), - S2 = S1#state{len = Len - 1, messages = M2}, - case AckReq of - true -> - S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; - false -> - S2 - end - end; + next_state_fetch_and_drop(S, Res, AckReq, 3); + +next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) -> + next_state_fetch_and_drop(S, Res, AckReq, 2); next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> #state{acks = AcksState} = S, @@ -265,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [3, Res]}, + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; @@ -278,19 +275,38 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}. + S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}; + +next_state(S, Res, {call, ?BQMOD, fold, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}. %% Postconditions postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of - {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> + {{MsgFetched, _IsDelivered, AckTag}, _BQ} -> {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgFetched =:= Msg andalso not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso - RemainingLen =:= Len - 1; + Len =/= 0; + {empty, _BQ} -> + Len =:= 0 + end; + +postcondition(S, {call, ?BQMOD, drop, _Args}, Res) -> + #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, + case Res of + {{MsgIdFetched, AckTag}, _BQ} -> + {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), + MsgId = eval({call, erlang, element, + [?RECORD_INDEX(id, basic_message), Msg]}), + MsgIdFetched =:= MsgId andalso + not proplists:is_defined(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms) andalso + Len =/= 0; {empty, _BQ} -> Len =:= 0 end; @@ -313,6 +329,15 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end, ReportedConfirmed); +postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) -> + #state{messages = Messages} = S, + {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) -> + {stop, Acc}; + ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) -> + FoldFun(Msg, MsgProps, Acc) + end, {cont, Acc0}, gb_trees:to_list(Messages)), + true = Model =:= Res; + postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> ?BQMOD:len(BQ) =:= Len. @@ -371,6 +396,15 @@ rand_choice(List, Selection, N) -> rand_choice(List -- [Picked], [Picked | Selection], N - 1). +makefoldfun(Size) -> + fun (Msg, _MsgProps, Acc) -> + case length(Acc) > Size of + false -> {cont, [Msg | Acc]}; + true -> {stop, Acc} + end + end. +foldacc() -> []. + dropfun(Props) -> Expiry = eval({call, erlang, element, [?RECORD_INDEX(expiry, message_properties), Props]}), @@ -388,6 +422,24 @@ drop_messages(Messages) -> end end. +next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) -> + #state{len = Len, messages = Messages, acks = Acks} = S, + ResultInfo = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]}, + S1 = S#state{bqstate = BQ1}, + case gb_trees:is_empty(Messages) of + true -> S1; + false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; + false -> + S2 + end + end. + -else. -export([prop_disabled/0]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..a3c82865 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,9 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -523,16 +529,12 @@ check_not_default_exchange(_) -> %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% -%% One, quite reasonable, interpretation of the spec, taken by the -%% QPid M1 Java client, is that the exclusion of "amq." prefixed names +%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names %% only applies on actual creation, and not in the cases where the -%% entity already exists. This is how we use this function in the code -%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly -%% 0-9SP1, making it illegal to attempt to declare an exchange/queue -%% with an amq.* name when passive=false. So this will need -%% revisiting. +%% entity already exists or passive=true. %% -%% TODO: enforce other constraints on name. See AMQP JIRA 69. +%% NB: We deliberately do not enforce the other constraints on names +%% required by the spec. check_name(Kind, NameBin = <<"amq.", _/binary>>) -> rabbit_misc:protocol_error( access_refused, @@ -677,7 +679,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -689,7 +691,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -728,10 +730,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1126,9 +1129,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1233,18 +1239,18 @@ reject(Requeue, Acked, Limiter) -> ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,11 +1283,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> +ack(Acked, State = #ch{queue_names = QNames}) -> Incs = fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count} | L]; + error -> L + end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). @@ -1339,23 +1349,42 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + %% The pmon:monitor_all/2 monitors all queues to which we + %% delivered. But we want to monitor even queues we didn't deliver + %% to, since we need their 'DOWN' messages to clean + %% queue_names. So we also need to monitor each QPid from + %% queues. But that only gets the masters (which is fine for + %% cleaning queue_names), so we need the union of both. + %% + %% ...and we need to add even non-delivered queues to queue_names + %% since alternative algorithms to update queue_names less + %% frequently would in fact be more expensive in the common case. + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {case dict:is_key(QPid, QNames0) of + true -> QNames0; + false -> dict:store(QPid, QName, QNames0) + end, pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1489,24 +1518,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a205b23d..e72cbafe 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -39,8 +39,7 @@ -spec(recover/0 :: () -> [name()]). -spec(callback/4:: (rabbit_types:exchange(), fun_name(), - fun((boolean()) -> non_neg_integer()) | atom(), - [any()]) -> 'ok'). + fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'). -spec(declare/6 :: @@ -114,26 +113,19 @@ recover() -> [XName || #exchange{name = XName} <- Xs]. callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> - Serial = fun (Bool) -> - case Serial0 of - _ when is_atom(Serial0) -> Serial0; - _ -> Serial0(Bool) - end + Serial = if is_function(Serial0) -> Serial0; + is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, - [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) - || M <- decorators()], + [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || + M <- decorators()], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). serialise_events(X = #exchange{type = Type}) -> - case [Serialise || M <- decorators(), - Serialise <- [M:serialise_events(X)], - Serialise == true] of - [] -> (type_to_module(Type)):serialise_events(); - _ -> true - end. + lists:any(fun (M) -> M:serialise_events(X) end, decorators()) + orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> Serial = case serialise_events(X) of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index df733546..e3d967bc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,11 +17,12 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, - dropwhile/3, set_ram_duration_target/2, ram_duration/1, + purge/1, publish/5, publish_delivered/4, + discard/3, fetch/2, drop/2, ack/2, + requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, + dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, fold/3]). + status/1, invoke/3, is_duplicate/2, foreach_ack/3]). -export([start/1, stop/0]). @@ -37,10 +38,8 @@ coordinator, backing_queue, backing_queue_state, - set_delivered, seen_status, confirmed, - ack_msg_id, known_senders }). @@ -54,10 +53,8 @@ coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), - set_delivered :: non_neg_integer(), seen_status :: dict(), confirmed :: [rabbit_guid:guid()], - ack_msg_id :: dict(), known_senders :: set() }). @@ -113,10 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0, seen_status = dict:new(), confirmed = [], - ack_msg_id = dict:new(), known_senders = sets:new() }. stop_mirroring(State = #state { coordinator = CPid, @@ -135,8 +130,8 @@ terminate({shutdown, dropped} = Reason, %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }; + State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; + terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -147,8 +142,7 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> stop_all_slaves(Reason, State), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. stop_all_slaves(Reason, #state{gm = GM}) -> Info = gm:info(GM), @@ -174,30 +168,27 @@ purge(State = #state { gm = GM, backing_queue_state = BQS }) -> ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), - {Count, State #state { backing_queue_state = BQS1, - set_delivered = 0 }}. + {Count, State #state { backing_queue_state = BQS1 }}. -publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> + backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), - AM1 = maybe_store_acktag(AckTag, MsgId, AM), - State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }, + State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. discard(MsgId, ChPid, State = #state { gm = GM, @@ -220,22 +211,17 @@ discard(MsgId, ChPid, State = #state { gm = GM, State end. -dropwhile(Pred, AckRequired, - State = #state{gm = GM, - backing_queue = BQ, - set_delivered = SetDelivered, - backing_queue_state = BQS }) -> +dropwhile(Pred, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), - Len1 = BQ:len(BQS1), - Dropped = Len - Len1, - case Dropped of - 0 -> ok; - _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) - end, - SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - {Next, Msgs, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 } }. + {Next, BQS1} = BQ:dropwhile(Pred, BQS), + {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}. + +fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS), + {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -267,43 +253,37 @@ drain_confirmed(State = #state { backing_queue = BQ, seen_status = SS1, confirmed = [] }}. -fetch(AckRequired, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered, - ack_msg_id = AM }) -> +fetch(AckRequired, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, - case Result of - empty -> - {Result, State1}; - {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, - Remaining} -> - ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, - SetDelivered1 = lists:max([0, SetDelivered - 1]), - AM1 = maybe_store_acktag(AckTag, MsgId, AM), - {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1, - ack_msg_id = AM1 }} - end. + {Result, case Result of + empty -> State1; + {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1) + end}. + +drop(AckRequired, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:drop(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + {Result, case Result of + empty -> State1; + {_MsgId, AckTag} -> drop_one(AckTag, State1) + end}. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> + backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), - {MsgIds, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 }}. + {MsgIds, State #state { backing_queue_state = BQS1 }}. -fold(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. +foreach_ack(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, @@ -312,6 +292,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +fold(Fun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:fold(Fun, Acc, BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:len(BQS). @@ -409,10 +394,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS1, - set_delivered = Len, seen_status = SeenStatus, confirmed = [], - ack_msg_id = dict:new(), known_senders = sets:from_list(KS) }. sender_death_fun() -> @@ -440,8 +423,25 @@ depth_fun() -> end) end. -maybe_store_acktag(undefined, _MsgId, AM) -> AM; -maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +drop_one(AckTag, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), + State. + +drop(PrevLen, AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + case PrevLen - Len of + 0 -> State; + Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}), + State + end. ensure_monitoring(ChPid, State = #state { coordinator = CPid, known_senders = KS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1ba1420f..9354f485 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -28,7 +28,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -318,7 +318,6 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; {gm, _Msg} -> 5; - {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. @@ -329,6 +328,8 @@ prioritise_info(Msg, _State) -> _ -> 0 end. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -701,7 +702,7 @@ process_instruction({publish, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> @@ -725,8 +726,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = - BQ:fetch(AckRequired, BQSN), + {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) @@ -735,21 +735,6 @@ process_instruction({drop, Length, Dropped, AckRequired}, true -> State1; false -> update_delta(ToDrop - Dropped, State1) end}; -process_instruction({fetch, AckRequired, MsgId, Remaining}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - _ when QLen =< Remaining andalso AckRequired -> - State; - _ when QLen =< Remaining -> - update_delta(-1, State) - end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 983abf29..2226f445 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1286,8 +1286,7 @@ test_statistics() -> QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, - {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), - QPid = Q#amqqueue.pid, + QRes = rabbit_misc:r(<<"/">>, queue, QName), X = rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), @@ -1311,9 +1310,9 @@ test_statistics() -> length(proplists:get_value( channel_queue_exchange_stats, E)) > 0 end), - [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QPid,X},[{publish,1}]}] = + [{{QRes,X},[{publish,1}]}] = proplists:get_value(channel_queue_exchange_stats, Event2), %% Check the stats remove stuff on queue deletion @@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) -> [channel_created, queue_created]), {_Writer, Ch} = test_spawn(), - expect_events(Ch, channel_created), + expect_events(pid, Ch, channel_created), rabbit_channel:shutdown(Ch), {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(Ch2, channel_created), + expect_events(pid, Ch2, channel_created), rabbit_channel:shutdown(Ch2), - {new, #amqqueue { pid = QPid } = Q} = + {new, #amqqueue{name = QName} = Q} = rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(QPid, queue_created), + expect_events(name, QName, queue_created), rabbit_amqqueue:delete(Q, false, false), rabbit_tests_event_receiver:stop(), passed. -expect_events(Pid, Type) -> - expect_event(Pid, Type), +expect_events(Tag, Key, Type) -> + expect_event(Tag, Key, Type), rabbit:force_event_refresh(), - expect_event(Pid, Type). + expect_event(Tag, Key, Type). -expect_event(Pid, Type) -> +expect_event(Tag, Key, Type) -> receive #event{type = Type, props = Props} -> - case pget(pid, Props) of - Pid -> ok; - _ -> expect_event(Pid, Type) + case pget(Tag, Props) of + Key -> ok; + _ -> expect_event(Tag, Key, Type) end after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. @@ -2217,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_publish(IsPersistent, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> lists:foldl( fun (N, VQN) -> rabbit_variable_queue:publish( @@ -2225,16 +2228,18 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), - PropFun(N, #message_properties{}), self(), VQN) + end}, + PayloadFun(N)), + PropFun(N, #message_properties{}), false, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN}, VQM} = rabbit_variable_queue:fetch(true, VQN), + Rem = rabbit_variable_queue:len(VQM), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -2300,13 +2305,40 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_drop/1, fun test_variable_queue_fold_msg_on_disk/1, - fun test_dropwhile/1, + fun test_dropfetchwhile/1, fun test_dropwhile_varying_ram_duration/1, + fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_requeue/1]], + fun test_variable_queue_requeue/1, + fun test_variable_queue_fold/1]], passed. +test_variable_queue_fold(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish( + true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + lists:foldl( + fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end, + VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Count, VQ0) -> + {Acc, VQ1} = rabbit_variable_queue:fold( + fun (M, _, A) -> + case msg2int(M) =< Cut of + true -> {cont, [M | A]}; + false -> {stop, A} + end + end, [], VQ0), + true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] == + [msg2int(M) || M <- Acc], + VQ1. + +msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> + binary_to_term(list_to_binary(lists:reverse(P))). + test_variable_queue_requeue(VQ0) -> Interval = 50, Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, @@ -2326,7 +2358,7 @@ test_variable_queue_requeue(VQ0) -> VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag, _}, VQb} = + {{#basic_message{}, true, AckTag}, VQb} = rabbit_variable_queue:fetch(true, VQa), VQb end, VQ5, lists:reverse(Acks)), @@ -2362,41 +2394,86 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6. -test_dropwhile(VQ0) -> +test_drop(VQ0) -> + %% start by sending a messages + VQ1 = variable_queue_publish(false, 1, VQ0), + %% drop message with AckRequired = true + {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = rabbit_variable_queue:is_empty(VQ2), + true = AckTag =/= undefinded, + %% drop again -> empty + {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), + %% requeue + {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), + %% drop message with AckRequired = false + {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + true = rabbit_variable_queue:is_empty(VQ5), + VQ5. + +test_dropfetchwhile(VQ0) -> Count = 10, %% add messages with sequential expiry VQ1 = variable_queue_publish( false, Count, - fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% fetch the first 5 messages + {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = + rabbit_variable_queue:fetchwhile( + fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, + fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + {[Msg | MsgAcc], [AckTag | AckAcc]} + end, {[], []}, VQ1), + true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], + + %% requeue them + {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2), %% drop the first 5 messages - {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), - - %% fetch five now - VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _, _}, VQM} = + {#message_properties{expiry = 6}, VQ4} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3), + + %% fetch 5 + VQ5 = lists:foldl(fun (N, VQN) -> + {{Msg, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), + true = msg2int(Msg) == N, VQM - end, VQ2, lists:seq(6, Count)), + end, VQ4, lists:seq(6, Count)), %% should be empty now - {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + true = rabbit_variable_queue:is_empty(VQ5), - VQ4. + VQ5. test_dropwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, VQ2} = rabbit_variable_queue:dropwhile( + fun (_) -> false end, VQ1), + VQ2 + end, VQ0). + +test_fetchwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( + fun (_) -> false end, + fun (_, _, _, A) -> A end, + ok, VQ1), + VQ2 + end, VQ0). + +test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + VQ3 = Fun(VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {_, undefined, VQ6} = - rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6 = Fun(VQ5), VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> @@ -2431,7 +2508,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + Len = rabbit_variable_queue:len(VQ2), {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). @@ -2496,8 +2574,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = - rabbit_variable_queue:fetch(true, VQ7), + {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), + Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), @@ -2519,7 +2597,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_variable_queue_fold_msg_on_disk(VQ0) -> VQ1 = variable_queue_publish(true, 1, VQ0), {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags), + VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end, + VQ2, AckTags), VQ3. test_queue_recover() -> @@ -2543,10 +2622,11 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + CountMinusOne = rabbit_variable_queue:len(VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName, QPid1) + rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6dc65bab..3e4c7c86 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,11 +17,12 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - depth/1, set_ram_duration_target/2, ram_duration/1, + publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + dropwhile/2, fetchwhile/4, + fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). -export([start/1, stop/0]). @@ -255,7 +256,6 @@ q4, next_seq_id, pending_ack, - pending_ack_index, ram_ack_index, index_state, msg_store_clients, @@ -349,7 +349,7 @@ q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), - ram_ack_index :: gb_tree(), + ram_ack_index :: gb_set(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4, publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> + IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -557,8 +557,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) - #msg_status { is_delivered = true }, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), @@ -579,27 +578,30 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). +dropwhile(Pred, State) -> + case queue_out(State) of + {empty, State1} -> + {undefined, a(State1)}; + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> {MsgProps, a(in_r(MsgStatus, State1))} + end + end. -dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; - (Next, S) -> {Next, undefined, S} - end, +fetchwhile(Pred, Fun, Acc, State) -> case queue_out(State) of {empty, State1} -> - End(undefined, a(State1)); + {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), AckRequired} of - {true, true} -> - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _, AckTag, _}, State3} = - internal_fetch(true, MsgStatus1, State2), - dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); - {true, false} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, AckRequired, State2, undefined); - {false, _} -> - End(MsgProps, a(in_r(MsgStatus, State1))) + case Pred(MsgProps) of + true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), + Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), + fetchwhile(Pred, Fun, Acc1, State3); + false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. @@ -615,6 +617,16 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. +drop(AckRequired, State) -> + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + {{_Msg, _IsDelivered, AckTag}, State2} = + internal_fetch(AckRequired, MsgStatus, State1), + {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} + end. + ack([], State) -> {[], State}; ack(AckTags, State) -> @@ -638,9 +650,9 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -fold(undefined, State, _AckTags) -> +foreach_ack(undefined, State, _AckTags) -> State; -fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> +foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> a(lists:foldl(fun(SeqId, State1) -> {MsgStatus, State2} = read_msg(gb_trees:get(SeqId, PA), false, State1), @@ -669,6 +681,26 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +fold(Fun, Acc, #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd }, + q3 = Q3, + q4 = Q4 } = State) -> + QFun = fun(MsgStatus, {Acc0, State0}) -> + {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } = + read_msg(MsgStatus, false, State0), + {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopGo, {AccNext, State1}} + end, + {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), + {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), + {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, + DeltaSeqId, DeltaSeqIdEnd, State2), + {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), + {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), + {Acc5, State5}. + len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -722,7 +754,7 @@ ram_duration(State = #vqstate { {AvgAckIngressRate, AckIngress1} = update_rate(Now, AckTimestamp, AckInCount, AckIngress), - RamAckCount = gb_trees:size(RamAckIndex), + RamAckCount = gb_sets:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso @@ -801,7 +833,7 @@ status(#vqstate { {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, + {ram_ack_count , gb_sets:size(RAI)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -864,11 +896,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; %% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps = #message_properties { delivered = Delivered }) -> - %% TODO would it make sense to remove #msg_status.is_delivered? +msg_status(IsPersistent, IsDelivered, SeqId, + Msg = #basic_message { id = MsgId }, MsgProps) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = Delivered, + is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. @@ -1006,7 +1037,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty(), + ram_ack_index = gb_sets:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1118,14 +1149,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag}, State1 #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, - len = Len1, + len = Len - 1, persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, @@ -1223,18 +1253,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, - msg_on_disk = MsgOnDisk } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, ack_in_counter = AckInCount}) -> - {AckEntry, RAI1} = - case MsgOnDisk of - true -> {m(trim_msg_status(MsgStatus)), RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} - end, - State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), + RAI1 = case Msg of + undefined -> RAI; + _ -> gb_sets:insert(SeqId, RAI) + end, + State #vqstate { pending_ack = gb_trees:insert(SeqId, MsgStatus, PA), ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. @@ -1242,7 +1269,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> {gb_trees:get(SeqId, PA), State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1253,7 +1280,7 @@ purge_pending_ack(KeepPersistent, accumulate_ack(MsgStatus, Acc) end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty() }, + ram_ack_index = gb_sets:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of error -> State1; @@ -1346,7 +1373,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue +%% Internal plumbing for requeue and fold %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1415,6 +1442,41 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; +qfoldl( Fun, {cont, Acc} = A, Q) -> + case ?QUEUE:out(Q) of + {empty, _Q} -> A; + {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + end. + +lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; +lfoldl(_Fun, {cont, _Acc} = A, []) -> A; +lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). + +delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> + {stop, {Acc, State}}; +delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> + {cont, {Acc, State}}; +delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, + #vqstate { index_state = IndexState, + msg_store_clients = MSCState } = State) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, + IndexState), + {StopCont, {Acc1, MSCState1}} = + lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, + {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}} + end, {cont, {Acc, MSCState}}, List), + delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -1453,7 +1515,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is @@ -1481,13 +1543,12 @@ limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - case gb_trees:is_empty(RAI) of + case gb_sets:is_empty(RAI) of true -> {Quota, State}; false -> - {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = - gb_trees:get(SeqId, PA), + {SeqId, RAI1} = gb_sets:take_largest(RAI), + MsgStatus = gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA), |