diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 19:31:13 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 19:31:13 +0000 |
commit | af0247f59ca83673146ad98b971823536488aae4 (patch) | |
tree | 0730eaaf0f5997c27afb42ab6d03e31d4914e6cd | |
parent | 6532a3ca4f8ec7b51e3810c4c2220aaeea681935 (diff) | |
parent | bf091440f2ad1dc80462b9145c7063f971f2cb2e (diff) | |
download | rabbitmq-server-bug25327.tar.gz |
merge default into bug25327bug25327
-rw-r--r-- | docs/rabbitmqctl.1.xml | 30 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/credit_flow.erl | 75 | ||||
-rw-r--r-- | src/pmon.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 235 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 15 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 43 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 19 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 60 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 49 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 232 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 44 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 203 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 63 |
17 files changed, 745 insertions, 374 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 34947b66..a95f7b3d 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -446,6 +446,36 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to synchronise. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a mirrored queue with unsynchronised slaves to + synchronise itself. The queue will block while + synchronisation takes place (all publishers to and + consumers from the queue will block). The queue must be + mirrored, and must not have any pending unacknowledged + messages for this command to succeed. + </para> + <para> + Note that unsynchronised queues from which messages are + being drained will become synchronised eventually. This + command is primarily useful for queues which are not + being drained. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0ccb80bf..7385b4b3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -27,9 +27,6 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {protocol, user, timeout_sec, frame_max, vhost, - client_properties, capabilities}). - -record(content, {class_id, properties, %% either 'none', or a decoded record/tuple diff --git a/src/credit_flow.erl b/src/credit_flow.erl index ba99811f..102c353f 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -52,6 +52,22 @@ %%---------------------------------------------------------------------------- +%% process dict update macro - eliminates the performance-hurting +%% closure creation a HOF would introduce +-define(UPDATE(Key, Default, Var, Expr), + begin + %% We deliberately allow Var to escape from the case here + %% to be used in Expr. Any temporary var we introduced + %% would also escape, and might conflict. + case get(Key) of + undefined -> Var = Default; + Var -> ok + end, + put(Key, Expr) + end). + +%%---------------------------------------------------------------------------- + %% There are two "flows" here; of messages and of credit, going in %% opposite directions. The variable names "From" and "To" refer to %% the flow of credit, but the function names refer to the flow of @@ -66,29 +82,33 @@ send(From) -> send(From, ?DEFAULT_CREDIT). send(From, {InitialCredit, _MoreCreditAfter}) -> - update({credit_from, From}, InitialCredit, - fun (1) -> block(From), - 0; - (C) -> C - 1 - end). + ?UPDATE({credit_from, From}, InitialCredit, C, + if C == 1 -> block(From), + 0; + true -> C - 1 + end). ack(To) -> ack(To, ?DEFAULT_CREDIT). ack(To, {_InitialCredit, MoreCreditAfter}) -> - update({credit_to, To}, MoreCreditAfter, - fun (1) -> grant(To, MoreCreditAfter), - MoreCreditAfter; - (C) -> C - 1 - end). + ?UPDATE({credit_to, To}, MoreCreditAfter, C, + if C == 1 -> grant(To, MoreCreditAfter), + MoreCreditAfter; + true -> C - 1 + end). handle_bump_msg({From, MoreCredit}) -> - update({credit_from, From}, 0, - fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From), - C + MoreCredit; - (C) -> C + MoreCredit - end). - -blocked() -> get(credit_blocked, []) =/= []. + ?UPDATE({credit_from, From}, 0, C, + if C =< 0 andalso C + MoreCredit > 0 -> unblock(From), + C + MoreCredit; + true -> C + MoreCredit + end). + +blocked() -> case get(credit_blocked) of + undefined -> false; + [] -> false; + _ -> true + end. peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it @@ -105,24 +125,17 @@ grant(To, Quantity) -> Msg = {bump_credit, {self(), Quantity}}, case blocked() of false -> To ! Msg; - true -> update(credit_deferred, [], - fun (Deferred) -> [{To, Msg} | Deferred] end) + true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end). +block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> - update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end), + ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), case blocked() of - false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], - erase(credit_deferred); + false -> case erase(credit_deferred) of + undefined -> ok; + Credits -> [To ! Msg || {To, Msg} <- Credits] + end; true -> ok end. - -get(Key, Default) -> - case get(Key) of - undefined -> Default; - Value -> Value - end. - -update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok. diff --git a/src/pmon.erl b/src/pmon.erl index 1aeebb72..37898119 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -19,6 +19,8 @@ -export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, monitored/1, is_empty/1]). +-compile({no_auto_import, [monitor/2]}). + -ifdef(use_specs). %%---------------------------------------------------------------------------- @@ -48,7 +50,9 @@ monitor(Item, M) -> false -> dict:store(Item, erlang:monitor(process, Item), M) end. -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). +monitor_all([], M) -> M; %% optimisation +monitor_all([Item], M) -> monitor(Item, M); %% optimisation +monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). demonitor(Item, M) -> case dict:find(Item, M) of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1b6cc223..fbe146e8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -173,6 +173,8 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (pid()) -> + 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). -endif. @@ -302,6 +304,8 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup([]) -> []; %% optimisation +lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> %% Normally we'd call mnesia:dirty_read/1 here, but that is quite %% expensive for reasons explained in rabbit_misc:dirty_read/1. @@ -596,6 +600,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0bef1e4b..43fd7e3f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1172,6 +1172,21 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); +handle_call(sync_mirrors, _From, + State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + backing_queue_state = BQS}) -> + S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + case BQ:depth(BQS) - BQ:len(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} + end; + _ -> reply({error, pending_acks}, State) + end; + +handle_call(sync_mirrors, _From, State) -> + reply({error, not_mirrored}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 617ea25f..1af60de8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,14 +33,15 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - conn_name, limiter, tx_status, next_tag, unacked_message_q, - uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + conn_name, limiter, tx, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, queue_names, queue_monitors, consumer_mapping, blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). +-record(tx, {msgs, acks, nacks}). + -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, @@ -65,6 +66,12 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INCR_STATS(Incs, Measure, State), + case rabbit_event:stats_level(State, #ch.stats_timer) of + fine -> incr_stats(Incs, Measure); + _ -> ok + end). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -186,12 +193,9 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, conn_pid = ConnPid, conn_name = ConnName, limiter = Limiter, - tx_status = none, + tx = none, next_tag = 1, unacked_message_q = queue:new(), - uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -314,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); + handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), - noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). + Timeout = case C of [] -> hibernate; _ -> 0 end, + %% NB: don't call noreply/1 since we don't want to send confirms. + {noreply, ensure_stats_timer(State1), Timeout}. handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), @@ -327,8 +334,10 @@ handle_info(timeout, State) -> handle_info(emit_stats, State) -> emit_stats(State), - noreply([ensure_stats_timer], - rabbit_event:reset_stats_timer(State, #ch.stats_timer)); + State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), + %% NB: don't call noreply/1 since we don't want to kick off the + %% stats timer. + {noreply, send_confirms(State1), hibernate}; handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), @@ -372,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> reply(Reply, [], NewState). - -reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate). - -reply(Reply, Mask, NewState, Timeout) -> - {reply, Reply, next_state(Mask, NewState), Timeout}. +reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. -noreply(NewState) -> noreply([], NewState). +noreply(NewState) -> {noreply, next_state(NewState), hibernate}. -noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate). - -noreply(Mask, NewState, Timeout) -> - {noreply, next_state(Mask, NewState), Timeout}. - --define(MASKED_CALL(Fun, Mask, State), - case lists:member(Fun, Mask) of - true -> State; - false -> Fun(State) - end). - -next_state(Mask, State) -> - State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State), - State2 = ?MASKED_CALL(send_confirms, Mask, State1), - State2. +next_state(State) -> ensure_stats_timer(send_confirms(State)). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). @@ -555,11 +545,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -record_confirm(undefined, _, State) -> - State; -record_confirm(MsgSeqNo, XName, State) -> - record_confirms([{MsgSeqNo, XName}], State). - record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -599,8 +584,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> %% while waiting for the reply to a synchronous command, we generally %% do allow this...except in the case of a pending tx.commit, where %% it could wreak havoc. -handle_method(_Method, _, #ch{tx_status = TxStatus}) - when TxStatus =/= none andalso TxStatus =/= in_progress -> +handle_method(_Method, _, #ch{tx = Tx}) + when Tx =:= committing orelse Tx =:= failed -> rabbit_misc:protocol_error( channel_error, "unexpected command while processing 'tx.commit'", []); @@ -614,7 +599,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, - tx_status = TxStatus, + tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -628,7 +613,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_user_id_header(Props, State), check_expiration_header(Props), {MsgSeqNo, State1} = - case {TxStatus, ConfirmEnabled} of + case {Tx, ConfirmEnabled} of {none, false} -> {undefined, State}; {_, _} -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} @@ -638,12 +623,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_trace:tap_in(Message, TraceState), Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), + DQ = {Delivery, QNames}, {noreply, - case TxStatus of - none -> deliver_to_queues({Delivery, QNames}, State1); - in_progress -> TMQ = State1#ch.uncommitted_message_q, - NewTMQ = queue:in({Delivery, QNames}, TMQ), - State1#ch{uncommitted_message_q = NewTMQ} + case Tx of + none -> deliver_to_queues(DQ, State1); + #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs), + State1#ch{tx = Tx#tx{msgs = Msgs1}} end}; {error, Reason} -> precondition_failed("invalid message: ~p", [Reason]) @@ -657,15 +642,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, - case TxStatus of - none -> ack(Acked, State1), - State1; - in_progress -> State1#ch{uncommitted_acks = - Acked ++ State1#ch.uncommitted_acks} + case Tx of + none -> ack(Acked, State1), + State1; + #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -1043,34 +1027,37 @@ handle_method(#'queue.purge'{queue = QueueNameBin, handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> precondition_failed("cannot switch from confirm to tx mode"); +handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> + {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; + handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; + {reply, #'tx.select_ok'{}, State}; -handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); -handle_method(#'tx.commit'{}, _, - State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL, - limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), - ack(TAL, State1), +handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, + acks = Acks, + nacks = Nacks}, + limiter = Limiter}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + ack(Acks, State1), lists:foreach( - fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), - {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks), + {noreply, maybe_complete_tx(State1#ch{tx = committing})}; -handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> +handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL, - uncommitted_nacks = TNL}) -> - TNL1 = lists:append([L || {_, L} <- TNL]), - UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; - -handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> + tx = #tx{acks = Acks, + nacks = Nacks}}) -> + NacksL = lists:append([L || {_, L} <- Nacks]), + UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))), + {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, + tx = new_tx()}}; + +handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) -> precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> @@ -1218,17 +1205,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, Content). reject(DeliveryTag, Requeue, Multiple, - State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> + State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, - case TxStatus of - none -> - reject(Requeue, Acked, State1#ch.limiter), - State1; - in_progress -> - State1#ch{uncommitted_nacks = - [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + case Tx of + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks], + State1#ch{tx = Tx#tx{nacks = Nacks1}} end}. reject(Requeue, Acked, Limiter) -> @@ -1243,14 +1228,14 @@ record_sent(ConsumerTag, AckRequired, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, trace_state = TraceState}) -> - incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> incr_stats([{queue_stats, QName, 1}], redeliver, State); + true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); false -> ok end, rabbit_trace:tap_out(Msg, TraceState), @@ -1294,11 +1279,9 @@ ack(Acked, State = #ch{queue_names = QNames}) -> end end, [], Acked), ok = notify_limiter(State#ch.limiter, Acked), - incr_stats(Incs, ack, State). + ?INCR_STATS(Incs, ack, State). -new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = [], - uncommitted_nacks = []}. +new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1375,43 +1358,46 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ XName, MsgSeqNo, Message, State#ch{queue_names = QNames1, queue_monitors = QMons1}), - incr_stats([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QName, XName}, 1} || - QPid <- DeliveredQPids, - {ok, QName} <- [dict:find(QPid, QNames1)]]], - publish, State1), + ?INCR_STATS([{exchange_stats, XName, 1} | + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State1), State1. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_route), - incr_stats([{exchange_stats, XName, 1}], return_unroutable, State), - record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> - record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + record_confirms([{MsgSeqNo, XName}], State); process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, - State#ch.unconfirmed)}. + State#ch.unconfirmed)}; +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State, no_route), + ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), + case MsgSeqNo of + undefined -> State; + _ -> record_confirms([{MsgSeqNo, XName}], State) + end. send_nacks([], State) -> State; -send_nacks(MXs, State = #ch{tx_status = none}) -> +send_nacks(MXs, State = #ch{tx = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); send_nacks(_, State) -> - maybe_complete_tx(State#ch{tx_status = failed}). + maybe_complete_tx(State#ch{tx = failed}). -send_confirms(State = #ch{tx_status = none, confirmed = []}) -> +send_confirms(State = #ch{tx = none, confirmed = []}) -> State; -send_confirms(State = #ch{tx_status = none, confirmed = C}) -> +send_confirms(State = #ch{tx = none, confirmed = C}) -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> - incr_stats([{exchange_stats, XName, 1}], confirm, State), + ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), send_confirms(MsgSeqNos, State#ch{confirmed = []}); @@ -1447,7 +1433,7 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -maybe_complete_tx(State = #ch{tx_status = in_progress}) -> +maybe_complete_tx(State = #ch{tx = #tx{}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> case dtree:is_empty(UC) of @@ -1455,16 +1441,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> true -> complete_tx(State#ch{confirmed = []}) end. -complete_tx(State = #ch{tx_status = committing}) -> +complete_tx(State = #ch{tx = committing}) -> ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), - State#ch{tx_status = in_progress}; -complete_tx(State = #ch{tx_status = failed}) -> + State#ch{tx = new_tx()}; +complete_tx(State = #ch{tx = failed}) -> {noreply, State1} = handle_exception( rabbit_misc:amqp_error( precondition_failed, "partial tx completion", [], 'tx.commit'), State), - State1#ch{tx_status = in_progress}. + State1#ch{tx = new_tx()}. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1473,19 +1459,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{tx_status = TE}) -> TE =/= none; +i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); -i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> - dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> - queue:len(UAMQ); -i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> - queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> - length(TAL); +i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs); +i(messages_uncommitted, #ch{}) -> 0; +i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks); +i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> @@ -1496,12 +1479,8 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -incr_stats(Incs, Measure, State) -> - case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> [update_measures(Type, Key, Inc, Measure) || - {Type, Key, Inc} <- Incs]; - _ -> ok - end. +incr_stats(Incs, Measure) -> + [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs]. update_measures(Type, Key, Inc, Measure) -> Measures = case get({Type, Key}) of diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 669a0787..5bde996e 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,7 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5]). +-export([start/0, stop/0, action/5, sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -50,6 +50,7 @@ update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, + {sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -168,7 +169,7 @@ start() -> {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), usage(); - {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) -> + {error, {Problem, Reason}} when is_atom(Problem), is_binary(Reason) -> %% We handle this common case specially to avoid ~p since %% that has i18n issues print_error("~s: ~s", [Problem, Reason]), @@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, [ClusterNode, RemoveWhenOffline]); +action(sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Synchronising queue \"~s\" in vhost \"~s\"", [Q, VHost]), + rpc_call(Node, rabbit_control_main, sync_queue, + [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); @@ -513,6 +520,10 @@ action(eval, Node, [Expr], _Opts, _Inform) -> format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). +sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e72cbafe..9339161f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -310,22 +310,19 @@ route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; route(X = #exchange{name = XName}, Delivery) -> - route1(Delivery, {queue:from_list([X]), XName, []}). - -route1(Delivery, {WorkList, SeenXs, QNames}) -> - case queue:out(WorkList) of - {empty, _WorkList} -> - lists:usort(QNames); - {{value, X = #exchange{type = Type}}, WorkList1} -> - DstNames = process_alternate( - X, ((type_to_module(Type)):route(X, Delivery))), - route1(Delivery, - lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, - DstNames)) - end. + route1(Delivery, {[X], XName, []}). + +route1(_, {[], _, QNames}) -> + lists:usort(QNames); +route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> + DstNames = process_alternate( + X, ((type_to_module(Type)):route(X, Delivery))), + route1(Delivery, + lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, + DstNames)). process_alternate(#exchange{arguments = []}, Results) -> %% optimisation - Results; + Results; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; @@ -339,23 +336,25 @@ process_route(#resource{kind = exchange} = XName, Acc; process_route(#resource{kind = exchange} = XName, {WorkList, #resource{kind = exchange} = SeenX, QNames}) -> - {case lookup(XName) of - {ok, X} -> queue:in(X, WorkList); - {error, not_found} -> WorkList - end, gb_sets:from_list([SeenX, XName]), QNames}; + {cons_if_present(XName, WorkList), + gb_sets:from_list([SeenX, XName]), QNames}; process_route(#resource{kind = exchange} = XName, {WorkList, SeenXs, QNames} = Acc) -> case gb_sets:is_element(XName, SeenXs) of true -> Acc; - false -> {case lookup(XName) of - {ok, X} -> queue:in(X, WorkList); - {error, not_found} -> WorkList - end, gb_sets:add_element(XName, SeenXs), QNames} + false -> {cons_if_present(XName, WorkList), + gb_sets:add_element(XName, SeenXs), QNames} end; process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. +cons_if_present(XName, L) -> + case lookup(XName) of + {ok, X} -> [X | L]; + {error, not_found} -> L + end. + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:read({rabbit_exchange, XName}) of diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index cedbbdb3..8ee9ad5b 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -104,8 +104,6 @@ advance_blocks({B1, B2, B3, B4}, I) -> B5 = erlang:phash2({B1, I}, 4294967296), {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}. -blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>. - %% generate a GUID. This function should be used when performance is a %% priority and predictability is not an issue. Otherwise use %% gen_secure/0. @@ -114,14 +112,15 @@ gen() -> %% time we need a new guid we rotate them producing a new hash %% with the aid of the counter. Look at the comments in %% advance_blocks/2 for details. - {BS, I} = case get(guid) of - undefined -> <<B1:32, B2:32, B3:32, B4:32>> = - erlang:md5(term_to_binary(fresh())), - {{B1,B2,B3,B4}, 0}; - {BS0, I0} -> advance_blocks(BS0, I0) - end, - put(guid, {BS, I}), - blocks_to_binary(BS). + case get(guid) of + undefined -> <<B1:32, B2:32, B3:32, B4:32>> = Res = + erlang:md5(term_to_binary(fresh())), + put(guid, {{B1, B2, B3, B4}, 0}), + Res; + {BS, I} -> {{B1, B2, B3, B4}, _} = S = advance_blocks(BS, I), + put(guid, S), + <<B1:32, B2:32, B3:32, B4:32>> + end. %% generate a non-predictable GUID. %% diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e857f395..0df7ea1c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,15 +26,16 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). +-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). --record(state, { gm, +-record(state, { name, + gm, coordinator, backing_queue, backing_queue_state, @@ -49,7 +50,8 @@ -type(death_fun() :: fun ((pid()) -> 'ok')). -type(depth_fun() :: fun (() -> 'ok')). --type(master_state() :: #state { gm :: pid(), +-type(master_state() :: #state { name :: rabbit_amqqueue:name(), + gm :: pid(), coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), @@ -58,14 +60,16 @@ known_senders :: set() }). --spec(promote_backing_queue_state/7 :: - (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> - master_state()). +-spec(promote_backing_queue_state/8 :: + (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(), + [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(depth_fun/0 :: () -> depth_fun()). -spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> master_state()). -spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). +-spec(sync_mirrors/1 :: (master_state()) -> + {'ok', master_state()} | {stop, any(), master_state()}). -endif. @@ -106,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, @@ -121,6 +126,29 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. +sync_mirrors(State = #state { name = QName, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Log = fun (Fmt, Params) -> + rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n", + [rabbit_misc:rs(QName) | Params]) + end, + Log("~p messages to synchronise", [BQ:len(BQS)]), + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + Ref = make_ref(), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), + S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, + case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; + {sync_died, R, BQS1} -> Log("~p", [R]), + {ok, S(BQS1)}; + {already_synced, BQS1} -> {ok, S(BQS1)}; + {ok, BQS1} -> Log("complete", []), + {ok, S(BQS1)} + end. + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -144,17 +172,14 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{gm = GM}) -> - Info = gm:info(GM), - Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), - node(Pid) =/= node()], - MRefs = [erlang:monitor(process, S) || S <- Slaves], +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + MRefs = [erlang:monitor(process, SPid) || SPid <- SPids], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. - QName = proplists:get_value(group_name, Info), rabbit_misc:execute_mnesia_transaction( fun () -> [Q] = mnesia:read({rabbit_queue, QName}), @@ -385,17 +410,18 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> +promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), Len = BQ:len(BQS1), Depth = BQ:depth(BQS1), true = Len == Depth, %% ASSERTION: everything must have been requeued ok = gm:broadcast(GM, {depth, Depth}), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS1, - seen_status = SeenStatus, + seen_status = Seen, confirmed = [], known_senders = sets:from_list(KS) }. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9354f485..53564f09 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,6 +222,29 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, Syncer}, + State = #state { depth_delta = DD, + backing_queue = BQ, + backing_queue_state = BQS }) -> + State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), + S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, + rate_timer_ref = TRefN, + backing_queue_state = BQSN} end, + %% [0] We can only sync when there are no pending acks + case rabbit_mirror_queue_sync:slave( + DD, Ref, TRef, Syncer, BQ, BQS, + fun (BQN, BQSN) -> + BQSN1 = update_ram_duration(BQN, BQSN), + TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRefN, BQSN1} + end) of + denied -> noreply(State1); + {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {failed, Res} -> noreply(S(Res)); + {stop, Reason, Res} -> {stop, Reason, S(Res)} + end; + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -232,15 +255,10 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }); +handle_info(update_ram_duration, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + noreply(State#state{rate_timer_ref = just_measured, + backing_queue_state = update_ram_duration(BQ, BQS)}); handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( @@ -357,6 +375,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> + case lists:member(SPid, SPids) of + true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). @@ -528,7 +551,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, AckTags, SS, MPids), + QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -815,6 +838,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). +update_ram_duration(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQ:set_ram_duration_target(DesiredDuration, BQS1). + record_synchronised(#amqqueue { name = QName }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl new file mode 100644 index 00000000..508d46e9 --- /dev/null +++ b/src/rabbit_mirror_queue_sync.erl @@ -0,0 +1,232 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_sync). + +-include("rabbit.hrl"). + +-export([master_prepare/3, master_go/5, slave/7]). + +-define(SYNC_PROGRESS_INTERVAL, 1000000). + +%% There are three processes around, the master, the syncer and the +%% slave(s). The syncer is an intermediary, linked to the master in +%% order to make sure we do not mess with the master's credit flow or +%% set of monitors. +%% +%% Interactions +%% ------------ +%% +%% '*' indicates repeating messages. All are standard Erlang messages +%% except sync_start which is sent over GM to flush out any other +%% messages that we might have sent that way already. (credit) is the +%% usual credit_flow bump message every so often. +%% +%% Master Syncer Slave(s) +%% sync_mirrors -> || || +%% (from channel) || -- (spawns) --> || || +%% || --------- sync_start (over GM) -------> || +%% || || <--- sync_ready ---- || +%% || || (or) || +%% || || <--- sync_deny ----- || +%% || <--- ready ---- || || +%% || <--- next* ---- || || } +%% || ---- msg* ----> || || } loop +%% || || ---- sync_msg* ----> || } +%% || || <--- (credit)* ----- || } +%% || <--- next ---- || || +%% || ---- done ----> || || +%% || || -- sync_complete --> || +%% || (Dies) || + +-ifdef(use_specs). + +-type(log_fun() :: fun ((string(), [any()]) -> 'ok')). +-type(bq() :: atom()). +-type(bqs() :: any()). + +-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) -> + {'already_synced', bqs()} | {'ok', bqs()} | + {'shutdown', any(), bqs()} | + {'sync_died', any(), bqs()}). +-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(), + bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> + 'denied' | + {'ok' | 'failed', {timer:tref(), bqs()}} | + {'stop', any(), {timer:tref(), bqs()}}). + +-endif. + +%% --------------------------------------------------------------------------- +%% Master + +master_prepare(Ref, Log, SPids) -> + MPid = self(), + spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + +master_go(Syncer, Ref, Log, BQ, BQS) -> + Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, + receive + {'EXIT', Syncer, normal} -> {already_synced, BQS}; + {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; + {ready, Syncer} -> master_go0(Args, BQ, BQS) + end. + +master_go0(Args, BQ, BQS) -> + case BQ:fold(fun (Msg, MsgProps, Acc) -> + master_send(Msg, MsgProps, Args, Acc) + end, {0, erlang:now()}, BQS) of + {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; + {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; + {_, BQS1} -> master_done(Args, BQS1) + end. + +master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> + T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of + true -> Log("~p messages", [I]), + erlang:now(); + false -> Last + end, + receive + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age) + after 0 -> + ok + end, + receive + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {cont, {I + 1, T}}; + {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; + {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} + end. + +master_done({Syncer, Ref, _Log, Parent}, BQS) -> + receive + {next, Ref} -> unlink(Syncer), + Syncer ! {done, Ref}, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end, + {ok, BQS}; + {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS}; + {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS} + end. + +%% Master +%% --------------------------------------------------------------------------- +%% Syncer + +syncer(Ref, Log, MPid, SPids) -> + [erlang:monitor(process, SPid) || SPid <- SPids], + %% We wait for a reply from the slaves so that we know they are in + %% a receive block and will thus receive messages we send to them + %% *without* those messages ending up in their gen_server2 pqueue. + case [SPid || SPid <- SPids, + receive + {sync_ready, Ref, SPid} -> true; + {sync_deny, Ref, SPid} -> false; + {'DOWN', _, process, SPid, _} -> false + end] of + [] -> Log("all slaves already synced", []); + SPids1 -> MPid ! {ready, self()}, + Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), + syncer_loop(Ref, MPid, SPids1) + end. + +syncer_loop(Ref, MPid, SPids) -> + MPid ! {next, Ref}, + receive + {msg, Ref, Msg, MsgProps} -> + SPids1 = wait_for_credit(SPids), + [begin + credit_flow:send(SPid), + SPid ! {sync_msg, Ref, Msg, MsgProps} + end || SPid <- SPids1], + syncer_loop(Ref, MPid, SPids1); + {done, Ref} -> + [SPid ! {sync_complete, Ref} || SPid <- SPids] + end. + +wait_for_credit(SPids) -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + wait_for_credit(SPids); + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + wait_for_credit(lists:delete(SPid, SPids)) + end; + false -> SPids + end. + +%% Syncer +%% --------------------------------------------------------------------------- +%% Slave + +slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> + Syncer ! {sync_deny, Ref, self()}, + denied; + +slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> + MRef = erlang:monitor(process, Syncer), + Syncer ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, + rabbit_misc:get_parent()}, TRef, BQS1). + +slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, + TRef, BQS) -> + receive + {'DOWN', MRef, process, Syncer, _Reason} -> + %% If the master dies half way we are not in the usual + %% half-synced state (with messages nearer the tail of the + %% queue); instead we have ones nearer the head. If we then + %% sync with a newly promoted master, or even just receive + %% messages from it, we have a hole in the middle. So the + %% only thing to do here is purge. + {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(Syncer), + {failed, {TRef, BQS1}}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + slave_sync_loop(Args, TRef, BQS); + {sync_complete, Ref} -> + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(Syncer), + {ok, {TRef, BQS}}; + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age), + slave_sync_loop(Args, TRef, BQS); + {'$gen_cast', {set_ram_duration_target, Duration}} -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + slave_sync_loop(Args, TRef, BQS1); + update_ram_duration -> + {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), + slave_sync_loop(Args, TRef1, BQS1); + {sync_msg, Ref, Msg, Props} -> + credit_flow:ack(Syncer), + Props1 = Props#message_properties{needs_confirming = false}, + BQS1 = BQ:publish(Msg, Props1, true, none, BQS), + slave_sync_loop(Args, TRef, BQS1); + {'EXIT', Parent, Reason} -> + {stop, Reason, {TRef, BQS}}; + %% If the master throws an exception + {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> + BQ:delete_and_terminate(Reason, BQS), + {stop, Reason, {TRef, undefined}} + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 4efde50e..ce3e3802 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -67,6 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). +-export([get_parent/0]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -241,7 +242,7 @@ -spec(interval_operation/4 :: ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). - +-spec(get_parent/0 :: () -> pid()). -endif. %%---------------------------------------------------------------------------- @@ -352,13 +353,12 @@ set_table_value(Table, Key, Type, Value) -> sort_field_table( lists:keystore(Key, 1, Table, {Key, Type, Value})). -r(#resource{virtual_host = VHostPath}, Kind, Name) - when is_binary(Name) -> +r(#resource{virtual_host = VHostPath}, Kind, Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; -r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) -> +r(VHostPath, Kind, Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}. -r(VHostPath, Kind) when is_binary(VHostPath) -> +r(VHostPath, Kind) -> #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> @@ -1046,3 +1046,37 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> {false, false} -> lists:max([IdealInterval, round(LastInterval / 1.5)]) end}. + +%% ------------------------------------------------------------------------- +%% Begin copypasta from gen_server2.erl + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid (Parent) -> Parent; + [Parent | _] when is_atom(Parent) -> name_to_pid(Parent); + _ -> exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> case whereis_name(Name) of + undefined -> exit(could_not_find_registerd_name); + Pid -> Pid + end; + Pid -> Pid + end. + +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> case erlang:is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> Pid + end; + [] -> undefined + end. + +%% End copypasta from gen_server2.erl +%% ------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 928786e9..13e8feff 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -35,12 +35,16 @@ %%-------------------------------------------------------------------------- --record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv, +-record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_resources, - last_blocked_by, last_blocked_at, host, peer_host, - port, peer_port}). + channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). + +-record(connection, {name, host, peer_host, port, peer_port, + protocol, user, timeout_sec, frame_max, vhost, + client_properties, capabilities, + auth_mechanism, auth_state}). + +-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -205,15 +209,21 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, {PeerHost, PeerPort, Host, Port} = socket_ends(Sock), State = #v1{parent = Parent, sock = ClientSock, - name = list_to_binary(Name), connection = #connection{ + name = list_to_binary(Name), + host = Host, + peer_host = PeerHost, + port = Port, + peer_port = PeerPort, protocol = none, user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, - capabilities = []}, + capabilities = [], + auth_mechanism = none, + auth_state = none}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -224,15 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, - auth_mechanism = none, - auth_state = none, - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never, - host = Host, - peer_host = PeerHost, - port = Port, - peer_port = PeerPort}, + throttle = #throttle{ + conserve_resources = false, + last_blocked_by = none, + last_blocked_at = never}}, try ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( @@ -288,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_resources, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); +handle_other({conserve_resources, Conserve}, Deb, + State = #v1{throttle = Throttle}) -> + Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + recvloop(Deb, control_throttle(State#v1{throttle = Throttle1})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -372,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_resources = Mem}) -> - case {CS, Mem orelse credit_flow:blocked()} of +control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> + case {CS, (Throttle#throttle.conserve_resources orelse + credit_flow:blocked())} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), State#v1{connection_state = running}; - {blocked, true} -> update_last_blocked_by(State); + {blocked, true} -> State#v1{throttle = update_last_blocked_by( + Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking}) -> +maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - update_last_blocked_by(State#v1{connection_state = blocked, - last_blocked_at = erlang:now()}); + State#v1{connection_state = blocked, + throttle = update_last_blocked_by( + Throttle#throttle{last_blocked_at = erlang:now()})}; maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_resources = true}) -> - State#v1{last_blocked_by = resource}; -update_last_blocked_by(State = #v1{conserve_resources = false}) -> - State#v1{last_blocked_by = flow}. +update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) -> + Throttle#throttle{last_blocked_by = resource}; +update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) -> + Throttle#throttle{last_blocked_by = flow}. %%-------------------------------------------------------------------------- %% error handling / termination @@ -531,9 +540,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> %%-------------------------------------------------------------------------- create_channel(Channel, State) -> - #v1{sock = Sock, name = Name, queue_collector = Collector, + #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, + connection = #connection{name = Name, + protocol = Protocol, frame_max = FrameMax, user = User, vhost = VHost, @@ -594,40 +604,36 @@ handle_frame(Type, Channel, Payload, State) -> unexpected_frame(Type, Channel, Payload, State). process_frame(Frame, Channel, State) -> - {ChPid, AState} = case get({channel, Channel}) of + ChKey = {channel, Channel}, + {ChPid, AState} = case get(ChKey) of undefined -> create_channel(Channel, State); Other -> Other end, - case process_channel_frame(Frame, ChPid, AState) of - {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {error, Reason} -> handle_exception(State, Channel, Reason) - end. - -process_channel_frame(Frame, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> {ok, NewAState}; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - {ok, NewAState}; - {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( - ChPid, Method, Content), - {ok, NewAState}; - {error, Reason} -> {error, Reason} + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - control_throttle(State); -post_process_frame({method, MethodName, _}, _ChPid, - State = #v1{connection = #connection{ - protocol = Protocol}}) -> - case Protocol:method_has_content(MethodName) of - true -> erlang:bump_reductions(2000), - maybe_block(control_throttle(State)); - false -> control_throttle(State) - end; + State; +post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> + maybe_block(State); +post_process_frame({content_body, _}, _ChPid, State) -> + maybe_block(State); post_process_frame(_Frame, _ChPid, State) -> - control_throttle(State). + State. %%-------------------------------------------------------------------------- @@ -746,13 +752,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, {table, Capabilities1} -> Capabilities1; _ -> [] end, - State = State0#v1{auth_mechanism = AuthMechanism, - auth_state = AuthMechanism:init(Sock), - connection_state = securing, + State = State0#v1{connection_state = securing, connection = Connection#connection{ client_properties = ClientProperties, - capabilities = Capabilities}}, + capabilities = Capabilities, + auth_mechanism = AuthMechanism, + auth_state = AuthMechanism:init(Sock)}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, @@ -790,10 +796,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, - connection = Connection = #connection{ - user = User, - protocol = Protocol}, - sock = Sock}) -> + connection = Connection = #connection{ + user = User, + protocol = Protocol}, + sock = Sock, + throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -801,7 +808,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, - conserve_resources = Conserve}), + throttle = Throttle#throttle{ + conserve_resources = Conserve}}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -870,10 +878,10 @@ auth_mechanisms_binary(Sock) -> string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, - State = #v1{auth_mechanism = AuthMechanism, - auth_state = AuthState, - connection = Connection = - #connection{protocol = Protocol}, + State = #v1{connection = Connection = + #connection{protocol = Protocol, + auth_mechanism = AuthMechanism, + auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> @@ -886,14 +894,16 @@ auth_phase(Response, {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, ok = send_on_channel0(Sock, Secure, Protocol), - State#v1{auth_state = AuthState1}; + State#v1{connection = Connection#connection{ + auth_state = AuthState1}}; {ok, User} -> Tune = #'connection.tune'{channel_max = 0, frame_max = server_frame_max(), heartbeat = server_heartbeat()}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}} + connection = Connection#connection{user = User, + auth_state = none}} end. %%-------------------------------------------------------------------------- @@ -901,11 +911,6 @@ auth_phase(Response, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); -i(name, #v1{name = Name}) -> Name; -i(host, #v1{host = Host}) -> Host; -i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost; -i(port, #v1{port = Port}) -> Port; -i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort; i(SockStat, S) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; @@ -922,36 +927,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); i(state, #v1{connection_state = CS}) -> CS; -i(last_blocked_by, #v1{last_blocked_by = By}) -> By; -i(last_blocked_age, #v1{last_blocked_at = never}) -> +i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; +i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> infinity; -i(last_blocked_age, #v1{last_blocked_at = T}) -> +i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); -i(auth_mechanism, #v1{auth_mechanism = none}) -> +i(Item, #v1{connection = Conn}) -> ic(Item, Conn). + +ic(name, #connection{name = Name}) -> Name; +ic(host, #connection{host = Host}) -> Host; +ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost; +ic(port, #connection{port = Port}) -> Port; +ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort; +ic(protocol, #connection{protocol = none}) -> none; +ic(protocol, #connection{protocol = P}) -> P:version(); +ic(user, #connection{user = none}) -> ''; +ic(user, #connection{user = U}) -> U#user.username; +ic(vhost, #connection{vhost = VHost}) -> VHost; +ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; +ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(client_properties, #connection{client_properties = CP}) -> CP; +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; -i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> +ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) -> proplists:get_value(name, Mechanism:description()); -i(protocol, #v1{connection = #connection{protocol = none}}) -> - none; -i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> - Protocol:version(); -i(user, #v1{connection = #connection{user = none}}) -> - ''; -i(user, #v1{connection = #connection{user = #user{ - username = Username}}}) -> - Username; -i(vhost, #v1{connection = #connection{vhost = VHost}}) -> - VHost; -i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> - Timeout; -i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> - FrameMax; -i(client_properties, #v1{connection = #connection{client_properties = - ClientProperties}}) -> - ClientProperties; -i(Item, #v1{}) -> - throw({bad_argument, Item}). +ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> case Get(Sock) of diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index b9a7cc15..601656da 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -54,13 +54,15 @@ enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). +tap_in(_Msg, none) -> ok; tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) -> - maybe_trace(TraceX, Msg, <<"publish">>, XName, []). + trace(TraceX, Msg, <<"publish">>, XName, []). +tap_out(_Msg, none) -> ok; tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - maybe_trace(TraceX, Msg, <<"deliver">>, QName, - [{<<"redelivered">>, signedint, RedeliveredNum}]). + trace(TraceX, Msg, <<"deliver">>, QName, + [{<<"redelivered">>, signedint, RedeliveredNum}]). %%---------------------------------------------------------------------------- @@ -81,14 +83,11 @@ update_config(Fun) -> %%---------------------------------------------------------------------------- -maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) -> +trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, + _RKPrefix, _RKSuffix, _Extra) -> ok; -maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, - _RKPrefix, _RKSuffix, _Extra) -> - ok; -maybe_trace(X, Msg = #basic_message{content = #content{ - payload_fragments_rev = PFR}}, - RKPrefix, RKSuffix, Extra) -> +trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}}, + RKPrefix, RKSuffix, Extra) -> {ok, _, _} = rabbit_basic:publish( X, <<RKPrefix/binary, ".", RKSuffix/binary>>, #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index eabfe136..37ca6de0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -596,9 +596,8 @@ fetchwhile(Pred, Fun, Acc, State) -> {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _IsDelivered, AckTag}, State3} = - remove(true, MsgStatus1, State2), + true -> {Msg, State2} = read_msg(MsgStatus, false, State1), + {AckTag, State3} = remove(true, MsgStatus, State2), fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end @@ -611,9 +610,9 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {Res, State3} = remove(AckRequired, MsgStatus1, State2), - {Res, a(State3)} + {Msg, State2} = read_msg(MsgStatus, false, State1), + {AckTag, State3} = remove(AckRequired, MsgStatus, State2), + {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} end. drop(AckRequired, State) -> @@ -621,8 +620,7 @@ drop(AckRequired, State) -> {empty, State1} -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> - {{_Msg, _IsDelivered, AckTag}, State2} = - remove(AckRequired, MsgStatus, State1), + {AckTag, State2} = remove(AckRequired, MsgStatus, State1), {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. @@ -674,8 +672,8 @@ ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = lists:foldl( fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) -> - {#msg_status { msg = Msg }, State1} = - read_msg(gb_trees:get(SeqId, PA), false, State0), + MsgStatus = gb_trees:get(SeqId, PA), + {Msg, State1} = read_msg(MsgStatus, false, State0), {MsgFun(Msg, SeqId, Acc0), State1} end, {Acc, State}, AckTags), {AccN, a(StateN)}. @@ -687,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1, q3 = Q3, q4 = Q4 } = State) -> QFun = fun(MsgStatus, {Acc0, State0}) -> - {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } = - read_msg(MsgStatus, false, State0), - {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0), + {Msg, State1} = read_msg(MsgStatus, false, State0), + {StopGo, AccNext} = + Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), {StopGo, {AccNext, State1}} end, {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), @@ -1074,9 +1072,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; - false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } + false -> {Msg, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, true, State), + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) } end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1092,20 +1091,18 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. -read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State). - -read_msg(MsgStatus = #msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, +read_msg(#msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }, CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam), - msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, _CountDiskToRam, State) -> - {MsgStatus, State}. + RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam), + {Msg, State #vqstate { ram_msg_count = RamMsgCount1, + msg_store_clients = MSCState1 }}; +read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) -> + {Msg, State}. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, @@ -1149,12 +1146,11 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {{Msg, IsDelivered, AckTag}, - State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1 }}. + {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len - 1, + persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1375,7 +1371,8 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - read_msg(MsgStatus, State); + {Msg, State1} = read_msg(MsgStatus, true, State), + {MsgStatus#msg_status { msg = Msg }, State1}; publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. |