diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-27 18:23:13 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-27 18:23:13 +0000 |
commit | ba3741561ccefd4e9d16a75d9edd11085b1e7161 (patch) | |
tree | 2e71b47f08f323225e61e1926abda8a98c10bf6d | |
parent | 9e8f4fc04ed36a6d7243cf050ce590e14375e15e (diff) | |
parent | 6f965d3ddfce609f3da3e275840d2d0351b66baf (diff) | |
download | rabbitmq-server-ba3741561ccefd4e9d16a75d9edd11085b1e7161.tar.gz |
re-merge bug25303 into default
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 30 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 119 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 136 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 62 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 6 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 56 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 79 |
14 files changed, 269 insertions, 274 deletions
@@ -1 +1 @@ -Please see http://www.rabbitmq.com/build-server.html for build instructions.
\ No newline at end of file +Please see http://www.rabbitmq.com/build-server.html for build instructions. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b2832b45..0ccb80bf 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -78,8 +78,7 @@ -record(event, {type, props, timestamp}). --record(message_properties, {expiry, needs_confirming = false, - delivered = false}). +-record(message_properties, {expiry, needs_confirming = false}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit.erl b/src/rabbit.erl index c3a6d283..7b8348fc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -400,13 +400,11 @@ status() -> is_running() -> is_running(node()). -is_running(Node) -> - rabbit_nodes:is_running(Node, rabbit). +is_running(Node) -> rabbit_nodes:is_running(Node, rabbit). environment() -> - lists:keysort( - 1, [P || P = {K, _} <- application:get_all_env(rabbit), - K =/= default_pass]). + lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit), + K =/= default_pass]). rotate_logs(BinarySuffix) -> Suffix = binary_to_list(BinarySuffix), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8ce1160c..7827b839 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start_mirroring/1, stop_mirroring/1]). %% internal --export([internal_declare/2, internal_delete/2, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -156,11 +156,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/2 :: - (name(), pid()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName, QPid), + false -> TailFun = internal_delete(QueueName), fun () -> TailFun(), ExistingQ end end end @@ -563,7 +563,7 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName, QPid) -> +internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of @@ -573,8 +573,7 @@ internal_delete(QueueName, QPid) -> fun() -> ok = T(), ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QueueName}]) + [{name, QueueName}]) end end end). @@ -588,13 +587,13 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). -stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). +start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = - qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + qlc:e(qlc:q([{QName, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), @@ -607,10 +606,9 @@ on_node_down(Node) -> fun () -> T(), lists:foreach( - fun({QName, QPid}) -> + fun(QName) -> ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QName}]) + [{name, QName}]) end, Qs) end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3c44a50..74717ace 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,7 +85,7 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [pid, + [name, policy, exclusive_consumer_pid, exclusive_consumer_tag, @@ -101,16 +101,14 @@ ]). -define(CREATION_EVENT_KEYS, - [pid, - name, + [name, durable, auto_delete, arguments, owner_pid ]). --define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]). %%---------------------------------------------------------------------------- @@ -122,11 +120,10 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), - State = #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = backing_queue_module(Q), + backing_queue = undefined, backing_queue_state = undefined, active_consumers = queue:new(), expires = undefined, @@ -183,7 +180,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName, self()), + rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -193,7 +190,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, State = #q{q = Q, - backing_queue = BQ, + backing_queue = undefined, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of #amqqueue{} = Q1 -> @@ -205,9 +202,11 @@ declare(Recover, From, State = #q{q = Q, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, Recover), recovery_barrier(Recover), - State1 = process_args(State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue = BQ, + backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -277,7 +276,8 @@ terminate_shutdown(Fun, State) -> case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - [emit_consumer_deleted(Ch, CTag) + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -485,11 +485,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = - fetch(AckRequired, State), + {Result, State1} = fetch(AckRequired, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {Result, BQ:is_empty(BQS), State2}. confirm_messages([], State) -> State; @@ -542,8 +541,8 @@ run_message_queue(State) -> State2. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, - Props = #message_properties{delivered = Delivered}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Props, Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( @@ -565,15 +564,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, State1) of + Props = message_properties(Message, Confirm, State), + case attempt_delivery(Delivery, Props, Delivered, State1) of {true, State2} -> State2; %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. @@ -605,9 +604,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). -remove_consumers(ChPid, Queue) -> +remove_consumers(ChPid, Queue, QName) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag), + emit_consumer_deleted(ChPid, CTag, QName), false; (_) -> true @@ -648,7 +647,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> - _ = remove_consumers(ChPid, Blocked), %% for stats emission + QName = qname(State), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -656,7 +656,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State#q.active_consumers, + QName), senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; @@ -704,10 +705,9 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) -> +message_properties(Message, Confirm, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually, - delivered = Delivered}. + needs_confirming = Confirm == eventually}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = @@ -955,19 +955,19 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> rabbit_event:notify(consumer_created, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). -emit_consumer_deleted(ChPid, ConsumerTag) -> +emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). %%---------------------------------------------------------------------------- @@ -1061,8 +1061,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag, Remaining}, State2} -> - State3 = + {{Message, IsDelivered, AckTag}, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), ChAckTags1 = sets:add_element(AckTag, ChAckTags), @@ -1071,14 +1071,13 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = ExistingHolder}) -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of + _From, State = #q{exclusive_consumer = Holder}) -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> @@ -1087,7 +1086,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder + true -> Holder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, @@ -1102,7 +1101,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck), + not NoAck, qname(State2)), reply(ok, State2) end; @@ -1113,7 +1112,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, not_found -> reply(ok, State); C = #cr{blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag), + emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), State1 = State#q{ @@ -1123,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end, active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers)}, + State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(From, ok, State1) @@ -1154,31 +1153,16 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); -handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - reply(ok, State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - reply(ok, State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + QName = qname(State), case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName) || {Ch, CTag, AckRequired} <- consumers(State)]; {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) + emit_consumer_created(Ch, CTag, true, AckRequired, QName) end, reply(ok, State). @@ -1300,6 +1284,23 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; +handle_cast(start_mirroring, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + noreply(State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_cast(stop_mirroring, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + noreply(State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ffa716b6..e2945e1d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,13 +26,9 @@ -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: - ('empty' | - %% MessageId, AckTag, Remaining_Len - {rabbit_types:msg_id(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:msg_id(), Ack})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: @@ -82,8 +78,8 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state(). + rabbit_types:message_properties(), boolean(), pid(), + state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -224,7 +220,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 764911b9..ed8f797a 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -286,12 +286,12 @@ next_state(S, Res, {call, ?BQMOD, fold, _Args}) -> postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of - {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> + {{MsgFetched, _IsDelivered, AckTag}, _BQ} -> {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgFetched =:= Msg andalso not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso - RemainingLen =:= Len - 1; + Len =/= 0; {empty, _BQ} -> Len =:= 0 end; @@ -299,14 +299,14 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> postcondition(S, {call, ?BQMOD, drop, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of - {{MsgIdFetched, AckTag, RemainingLen}, _BQ} -> + {{MsgIdFetched, AckTag}, _BQ} -> {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgId = eval({call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}), MsgIdFetched =:= MsgId andalso not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso - RemainingLen =:= Len - 1; + Len =/= 0; {empty, _BQ} -> Len =:= 0 end; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..b1ef3b6b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,9 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, - virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, delivering_queues, + virtual_host, most_recently_declared_queue, + queue_names, queue_monitors, consumer_mapping, + blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), @@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), - erase_queue_stats(QPid), - noreply(State4#ch{queue_monitors = pmon:erase( - QPid, State4#ch.queue_monitors)}); + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - State1 = monitor_delivering_queue(NoAck, QPid, State), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} @@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), State1 = monitor_delivering_queue( - NoAck, QPid, State#ch{consumer_mapping = CM1}), + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, - delivering_queues = DQ}) -> - State#ch{queue_monitors = pmon:monitor(QPid, QMons), +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), delivering_queues = case NoAck of true -> DQ; false -> sets:add_element(QPid, DQ) @@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) -> ok = notify_limiter(Limiter, Acked). record_sent(ConsumerTag, AckRequired, - Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State); + true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_trace_out(Msg, TraceState), @@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. -ack(Acked, State) -> +ack(Acked, State = #ch{queue_names = QNames}) -> Incs = fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - [{queue_stats, QPid, length(MsgIds)} | L] + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count} | L]; + error -> L + end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), incr_stats(Incs, ack, State). @@ -1339,23 +1353,42 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, - QNames}, State) -> - {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = State#ch{queue_monitors = - pmon:monitor_all(DeliveredQPids, - State#ch.queue_monitors)}, - State2 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State1), + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery), + %% The pmon:monitor_all/2 monitors all queues to which we + %% delivered. But we want to monitor even queues we didn't deliver + %% to, since we need their 'DOWN' messages to clean + %% queue_names. So we also need to monitor each QPid from + %% queues. But that only gets the masters (which is fine for + %% cleaning queue_names), so we need the union of both. + %% + %% ...and we need to add even non-delivered queues to queue_names + %% since alternative algorithms to update queue_names less + %% frequently would in fact be more expensive in the common case. + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {case dict:is_key(QPid, QNames0) of + true -> QNames0; + false -> dict:store(QPid, QName, QNames0) + end, pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, + State#ch{queue_names = QNames1, + queue_monitors = QMons1}), incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - State2. + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), + State1. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}], - return_unroutable, State), + incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1489,24 +1522,23 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - CoarseStats = infos(?STATISTICS_KEYS, State), + Coarse = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - coarse -> - rabbit_event:notify(channel_stats, Extra ++ CoarseStats); - fine -> - FineStats = - [{channel_queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {channel_exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_queue_exchange_stats, - [{QX, Stats} || - {{queue_exchange_stats, QX}, Stats} <- get()]}], - rabbit_event:notify(channel_stats, - Extra ++ CoarseStats ++ FineStats) + coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse); + fine -> Fine = [{channel_queue_stats, + [{QName, Stats} || + {{queue_stats, QName}, Stats} <- get()]}, + {channel_exchange_stats, + [{XName, Stats} || + {{exchange_stats, XName}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine) end. -erase_queue_stats(QPid) -> - erase({queue_stats, QPid}), +erase_queue_stats(QName) -> + erase({queue_stats, QName}), [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 15aeea01..c8a361b1 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/4, + purge/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, @@ -38,7 +38,6 @@ coordinator, backing_queue, backing_queue_state, - set_delivered, seen_status, confirmed, ack_msg_id, @@ -55,7 +54,6 @@ coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), - set_delivered :: non_neg_integer(), seen_status :: dict(), confirmed :: [rabbit_guid:guid()], ack_msg_id :: dict(), @@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0, seen_status = dict:new(), confirmed = [], ack_msg_id = dict:new(), @@ -136,8 +133,8 @@ terminate({shutdown, dropped} = Reason, %% in without this node being restarted. Thus we must do the full %% blown delete_and_terminate now, but only locally: we do not %% broadcast delete_and_terminate. - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }; + State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; + terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -148,8 +145,7 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> stop_all_slaves(Reason, State), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. + State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. stop_all_slaves(Reason, #state{gm = GM}) -> Info = gm:info(GM), @@ -175,17 +171,16 @@ purge(State = #state { gm = GM, backing_queue_state = BQS }) -> ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), - {Count, State #state { backing_queue_state = BQS1, - set_delivered = 0 }}. + {Count, State #state { backing_queue_state = BQS1 }}. -publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, @@ -224,7 +219,6 @@ discard(MsgId, ChPid, State = #state { gm = GM, dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, - set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), @@ -234,9 +228,7 @@ dropwhile(Pred, AckRequired, 0 -> ok; _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) end, - SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - {Next, Msgs, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 } }. + {Next, Msgs, State #state { backing_queue_state = BQS1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -268,34 +260,25 @@ drain_confirmed(State = #state { backing_queue = BQ, seen_status = SS1, confirmed = [] }}. -fetch(AckRequired, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +fetch(AckRequired, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> {Result, State1}; - {Message, IsDelivered, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, - {{Message, IsDelivered1, AckTag, Remaining}, - drop(Message#basic_message.id, AckTag, State1)} + {#basic_message{id = MsgId}, _IsDelivered, AckTag} -> + {Result, drop(MsgId, AckTag, State1)} end. -drop(AckRequired, State = #state { gm = GM, - backing_queue = BQ, +drop(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:drop(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, - case Result of - empty -> - {Result, State1}; - {MsgId, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - {Result, drop(MsgId, AckTag, State1)} - end. + {Result, case Result of + empty -> State1; + {MsgId, AckTag} -> drop(MsgId, AckTag, State1) + end}. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, @@ -423,7 +406,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS1, - set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -458,10 +440,12 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, - ack_msg_id = AM }) -> - State #state { set_delivered = lists:max([0, SetDelivered - 1]), - ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. +drop(MsgId, AckTag, State = #state { ack_msg_id = AM, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), + State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 2b3bd027..58f20476 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -133,11 +133,11 @@ on_node_up() -> end end, [], rabbit_queue) end), - [{ok, _} = add_mirror(QName, node()) || QName <- QNames], + [add_mirror(QName, node()) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> - [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes], + [drop_mirror(QName, Node) || Node <- Nodes], ok. drop_mirror(QName, MirrorNode) -> @@ -159,7 +159,7 @@ drop_mirror(QName, MirrorNode) -> end). add_mirrors(QName, Nodes) -> - [{ok, _} = add_mirror(QName, Node) || Node <- Nodes], + [add_mirror(QName, Node) || Node <- Nodes], ok. add_mirror(QName, MirrorNode) -> @@ -183,15 +183,7 @@ start_child(Name, MirrorNode, Q) -> fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, undefined} -> - %% this means the mirror process was - %% already running on the given node. - {ok, already_mirrored}; - {ok, down} -> - %% Node went down between us deciding to start a mirror - %% and actually starting it. Which is fine. - {ok, node_down}; - {ok, SPid} -> + {ok, SPid} when is_pid(SPid) -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), {ok, started}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3ad8eb77..9354f485 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -318,7 +318,6 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; {gm, _Msg} -> 5; - {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. @@ -703,7 +702,7 @@ process_instruction({publish, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> @@ -727,8 +726,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{MsgId, AckTag, _Remaining}, BQSN1} = - BQ:drop(AckRequired, BQSN), + {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 2717cc92..2c997f16 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -166,8 +166,8 @@ update_policies(VHost) -> [update_queue(Q, Policies) || Q <- rabbit_amqqueue:list(VHost)]} end), - [notify(X) || X <- Xs], - [notify(Q) || Q <- Qs], + [catch notify(X) || X <- Xs], + [catch notify(Q) || Q <- Qs], ok. update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d5c096a1..bb4ddceb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1286,8 +1286,7 @@ test_statistics() -> QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, - {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), - QPid = Q#amqqueue.pid, + QRes = rabbit_misc:r(<<"/">>, queue, QName), X = rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), @@ -1311,9 +1310,9 @@ test_statistics() -> length(proplists:get_value( channel_queue_exchange_stats, E)) > 0 end), - [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QPid,X},[{publish,1}]}] = + [{{QRes,X},[{publish,1}]}] = proplists:get_value(channel_queue_exchange_stats, Event2), %% Check the stats remove stuff on queue deletion @@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) -> [channel_created, queue_created]), {_Writer, Ch} = test_spawn(), - expect_events(Ch, channel_created), + expect_events(pid, Ch, channel_created), rabbit_channel:shutdown(Ch), {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(Ch2, channel_created), + expect_events(pid, Ch2, channel_created), rabbit_channel:shutdown(Ch2), - {new, #amqqueue { pid = QPid } = Q} = + {new, #amqqueue{name = QName} = Q} = rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(QPid, queue_created), + expect_events(name, QName, queue_created), rabbit_amqqueue:delete(Q, false, false), rabbit_tests_event_receiver:stop(), passed. -expect_events(Pid, Type) -> - expect_event(Pid, Type), +expect_events(Tag, Key, Type) -> + expect_event(Tag, Key, Type), rabbit:force_event_refresh(), - expect_event(Pid, Type). + expect_event(Tag, Key, Type). -expect_event(Pid, Type) -> +expect_event(Tag, Key, Type) -> receive #event{type = Type, props = Props} -> - case pget(pid, Props) of - Pid -> ok; - _ -> expect_event(Pid, Type) + case pget(Tag, Props) of + Key -> ok; + _ -> expect_event(Tag, Key, Type) end after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. @@ -2231,15 +2230,16 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> false -> 1 end}, PayloadFun(N)), - PropFun(N, #message_properties{}), self(), VQN) + PropFun(N, #message_properties{}), false, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN}, VQM} = rabbit_variable_queue:fetch(true, VQN), + Rem = rabbit_variable_queue:len(VQM), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -2346,7 +2346,7 @@ test_variable_queue_requeue(VQ0) -> VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag, _}, VQb} = + {{#basic_message{}, true, AckTag}, VQb} = rabbit_variable_queue:fetch(true, VQa), VQb end, VQ5, lists:reverse(Acks)), @@ -2386,14 +2386,16 @@ test_drop(VQ0) -> %% start by sending a messages VQ1 = variable_queue_publish(false, 1, VQ0), %% drop message with AckRequired = true - {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = rabbit_variable_queue:is_empty(VQ2), true = AckTag =/= undefinded, %% drop again -> empty {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), %% requeue {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), %% drop message with AckRequired = false - {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + true = rabbit_variable_queue:is_empty(VQ5), VQ5. test_dropwhile(VQ0) -> @@ -2412,7 +2414,7 @@ test_dropwhile(VQ0) -> %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _, _}, VQM} = + {{#basic_message{}, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), VQM end, VQ2, lists:seq(6, Count)), @@ -2465,7 +2467,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + Len = rabbit_variable_queue:len(VQ2), {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). @@ -2530,8 +2533,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = - rabbit_variable_queue:fetch(true, VQ7), + {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), + Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), @@ -2578,10 +2581,11 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + CountMinusOne = rabbit_variable_queue:len(VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName, QPid1) + rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b826413a..7fbac782 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,7 +17,7 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/4, discard/3, drain_confirmed/1, + publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, @@ -348,7 +348,7 @@ q4 :: ?QUEUE:?QUEUE(), next_seq_id :: seq_id(), pending_ack :: gb_tree(), - ram_ack_index :: gb_tree(), + ram_ack_index :: gb_set(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -520,16 +520,16 @@ purge(State = #vqstate { q4 = Q4, publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> + IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case ?QUEUE:is_empty(Q3) of false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; @@ -556,8 +556,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) - #msg_status { is_delivered = true }, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), @@ -591,8 +590,8 @@ dropwhile(Pred, AckRequired, State, Msgs) -> case {Pred(MsgProps), AckRequired} of {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _, AckTag, _}, State3} = - internal_fetch(true, MsgStatus1, State2), + {{Msg, _IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), @@ -619,9 +618,9 @@ drop(AckRequired, State) -> {empty, State1} -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> - {{_Msg, _IsDelivered, AckTag, Remaining}, State2} = + {{_Msg, _IsDelivered, AckTag}, State2} = internal_fetch(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)} + {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. ack([], State) -> @@ -749,7 +748,7 @@ ram_duration(State = #vqstate { {AvgAckIngressRate, AckIngress1} = update_rate(Now, AckTimestamp, AckInCount, AckIngress), - RamAckCount = gb_trees:size(RamAckIndex), + RamAckCount = gb_sets:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso @@ -828,7 +827,7 @@ status(#vqstate { {pending_acks , gb_trees:size(PA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RAI)}, + {ram_ack_count , gb_sets:size(RAI)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -891,11 +890,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; %% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps = #message_properties { delivered = Delivered }) -> - %% TODO would it make sense to remove #msg_status.is_delivered? +msg_status(IsPersistent, IsDelivered, SeqId, + Msg = #basic_message { id = MsgId }, MsgProps) -> #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = Delivered, + is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. @@ -1033,7 +1031,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, q4 = ?QUEUE:new(), next_seq_id = NextSeqId, pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty(), + ram_ack_index = gb_sets:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1145,14 +1143,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag}, State1 #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, - len = Len1, + len = Len - 1, persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, @@ -1250,18 +1247,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, - msg_on_disk = MsgOnDisk } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, ack_in_counter = AckInCount}) -> - {AckEntry, RAI1} = - case MsgOnDisk of - true -> {m(trim_msg_status(MsgStatus)), RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} - end, - State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA), + RAI1 = case Msg of + undefined -> RAI; + _ -> gb_sets:insert(SeqId, RAI) + end, + State #vqstate { pending_ack = gb_trees:insert(SeqId, MsgStatus, PA), ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. @@ -1269,7 +1263,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> {gb_trees:get(SeqId, PA), State #vqstate { pending_ack = gb_trees:delete(SeqId, PA), - ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}. purge_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, @@ -1280,7 +1274,7 @@ purge_pending_ack(KeepPersistent, accumulate_ack(MsgStatus, Acc) end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = gb_trees:empty(), - ram_ack_index = gb_trees:empty() }, + ram_ack_index = gb_sets:empty() }, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of error -> State1; @@ -1501,7 +1495,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, }) -> {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; %% Reduce memory of pending acks and alphas. The order is @@ -1529,13 +1523,12 @@ limit_ram_acks(0, State) -> {0, State}; limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, ram_ack_index = RAI }) -> - case gb_trees:is_empty(RAI) of + case gb_sets:is_empty(RAI) of true -> {Quota, State}; false -> - {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = - gb_trees:get(SeqId, PA), + {SeqId, RAI1} = gb_sets:take_largest(RAI), + MsgStatus = gb_trees:get(SeqId, PA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA), |