diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:27:48 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:27:48 +0000 |
commit | 3f11f949a6fd0dff7dc7373682aa70851613dbd2 (patch) | |
tree | c90d7bd31c139405ba245f91fbe01664ea1939fa | |
parent | 8326fecaca6f864633c3e98fc15df6a081c112d5 (diff) | |
parent | c03bcb90b5ce1d6035ac627500667f9b51a1e4a8 (diff) | |
download | rabbitmq-server-3f11f949a6fd0dff7dc7373682aa70851613dbd2.tar.gz |
Merge in default
-rw-r--r-- | docs/rabbitmq.config.example | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 60 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 9 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 41 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 78 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
10 files changed, 166 insertions, 129 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 7d6b80a7..4d323902 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -494,6 +494,10 @@ %% %% {port, 389}, + %% LDAP connection timeout, in milliseconds or 'infinity' + %% + %% {timeout, infinity}, + %% Enable logging of LDAP queries. %% One of %% - false (no logging is performed) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25f0a18c..e3202f54 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -861,12 +861,21 @@ prioritise_cast(Msg, _Len, State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - {ack, _AckTags, _ChPid} -> consumer_bias(State); + {ack, _AckTags, _ChPid} -> 3; %% [1] + {resume, _ChPid} -> 2; {notify_sent, _ChPid, _Credit} -> consumer_bias(State); - {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +%% [1] It should be safe to always prioritise ack / resume since they +%% will be rate limited by how fast consumers receive messages - +%% i.e. by notify_sent. We prioritise ack and resume to discourage +%% starvation caused by prioritising notify_sent. We don't vary their +%% prioritiy since acks should stay in order (some parts of the queue +%% stack are optimised for that) and to make things easier to reason +%% about. Finally, we prioritise ack over resume since it should +%% always reduce memory use. + consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:msg_rates(BQS) of {0.0, _} -> 0; diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index a5b91867..3aa32c09 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -16,7 +16,7 @@ -module(rabbit_autoheal). --export([init/0, maybe_start/1, node_down/2, handle_msg/3]). +-export([init/0, maybe_start/1, rabbit_down/2, node_down/2, handle_msg/3]). %% The named process we are running in. -define(SERVER, rabbit_node_monitor). @@ -37,10 +37,13 @@ %% selected as the first node in the cluster. %% %% To coordinate the restarting nodes we pick a special node from the -%% winning partition - the "winner". Restarting nodes then stop, tell -%% the winner they have done so, and wait for it to tell them it is -%% safe to start again. The winner and the leader are not necessarily -%% the same node. +%% winning partition - the "winner". Restarting nodes then stop, and +%% wait for it to tell them it is safe to start again. The winner +%% determines that a node has stopped just by seeing if its rabbit app +%% stops - if a node stops for any other reason it just gets a message +%% it will ignore, and otherwise we carry on. +%% +%% The winner and the leader are not necessarily the same node. %% %% Possible states: %% @@ -75,10 +78,31 @@ maybe_start(State) -> enabled() -> {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). -node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) -> - Autoheal; + +%% This is the winner receiving its last notification that a node has +%% stopped - all nodes can now start again +rabbit_down(Node, {winner_waiting, [Node], Notify}) -> + rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), + notify_safe(Notify), + not_healing; + +rabbit_down(Node, {winner_waiting, WaitFor, Notify}) -> + {winner_waiting, WaitFor -- [Node], Notify}; + +rabbit_down(_Node, State) -> + %% ignore, we already cancelled the autoheal process + State. + node_down(_Node, not_healing) -> not_healing; + +node_down(Node, {winner_waiting, _, Notify}) -> + rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), + %% Make sure any nodes waiting for us start - it won't necessarily + %% heal the partition but at least they won't get stuck. + notify_safe(Notify), + not_healing; + node_down(Node, _State) -> rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), not_healing. @@ -129,7 +153,6 @@ handle_msg({winner_is, Winner}, fun () -> MRef = erlang:monitor(process, {?SERVER, Winner}), rabbit:stop(), - send(Winner, {node_stopped, node()}), receive {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok; autoheal_safe_to_start -> ok @@ -139,30 +162,17 @@ handle_msg({winner_is, Winner}, end), restarting; -%% This is the winner receiving its last notification that a node has -%% stopped - all nodes can now start again -handle_msg({node_stopped, Node}, - {winner_waiting, [Node], Notify}, _Partitions) -> - rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]), - [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], - not_healing; - -handle_msg({node_stopped, Node}, - {winner_waiting, WaitFor, Notify}, _Partitions) -> - {winner_waiting, WaitFor -- [Node], Notify}; - handle_msg(_, restarting, _Partitions) -> %% ignore, we can contribute no further - restarting; - -handle_msg({node_stopped, _Node}, State, _Partitions) -> - %% ignore, we already cancelled the autoheal process - State. + restarting. %%---------------------------------------------------------------------------- send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}. +notify_safe(Notify) -> + [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify]. + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b272c64f..7bf6bd4a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -139,8 +139,8 @@ sync_mirrors(HandleInfo, EmitStats, backing_queue = BQ, backing_queue_state = BQS }) -> Log = fun (Fmt, Params) -> - rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n", - [rabbit_misc:rs(QName) | Params]) + rabbit_mirror_queue_misc:log_info( + QName, "Synchronising ~s: " ++ Fmt ++ "~n", Params) end, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4f77009c..4e9d5aef 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, - maybe_auto_sync/1]). + maybe_auto_sync/1, log_info/3, log_warning/3]). %% for testing only -export([module/1]). @@ -56,6 +56,8 @@ -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). +-spec(log_warning/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). -endif. @@ -156,9 +158,8 @@ drop_mirror(QName, MirrorNode) -> [QPid] when SPids =:= [] -> {error, cannot_drop_only_mirror}; [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), + log_info(Name, "Dropping queue mirror on node ~p~n", + [MirrorNode]), exit(Pid, {shutdown, dropped}), {ok, dropped} end; @@ -192,23 +193,29 @@ start_child(Name, MirrorNode, Q, SyncMode) -> fun () -> {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), - rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), + log_info(Name, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), rabbit_mirror_queue_slave:go(SPid, SyncMode) end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - case IsMaster of + log_info(QueueName, "~s ~s saw deaths of mirrors ~s~n", + [case IsMaster of true -> "Master"; false -> "Slave" end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). +log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args). +log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args). + +log(Level, QName, Fmt, Args) -> + rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, + [rabbit_misc:rs(QName) | Args]). + store_updated_slaves(Q = #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1f31b5c8..37d2e5b6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -136,9 +136,8 @@ handle_go(Q = #amqqueue{name = QName}) -> rabbit_mirror_queue_misc:maybe_auto_sync(Q1), {ok, State}; {stale, StalePid} -> - rabbit_log:warning("Detected stale HA master while adding " - "mirror of ~s: ~p~n", - [rabbit_misc:rs(QName), StalePid]), + rabbit_mirror_queue_misc:log_warning( + QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), {error, {stale_master_pid, StalePid}}; duplicate_live_master -> @@ -526,8 +525,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", - [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), + rabbit_mirror_queue_misc:log_info(QName, "Promoting slave ~s to master~n", + [rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 848c4a87..ab1c6063 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -685,7 +685,7 @@ pid_to_string(Pid) when is_pid(Pid) -> <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]). + format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above string_to_pid(Str) -> @@ -695,13 +695,7 @@ string_to_pid(Str) -> case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, CreStr, IdStr, SerStr]} -> - %% the NodeStr atom might be quoted, so we have to parse - %% it rather than doing a simple list_to_atom - NodeAtom = case erl_scan:string(NodeStr) of - {ok, [{atom, _, X}], _} -> X; - {error, _, _} -> throw(Err) - end, - <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), + <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), [Cre, Id, Ser] = lists:map(fun list_to_integer/1, [CreStr, IdStr, SerStr]), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 488f1df5..46dbd7b7 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -257,9 +257,8 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), - ok = handle_dead_rabbit(Node), [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], - {noreply, handle_dead_rabbit_state( + {noreply, handle_dead_rabbit( Node, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; @@ -270,8 +269,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info({nodedown, Node, Info}, State) -> rabbit_log:info("node ~p down: ~p~n", [Node, proplists:get_value(nodedown_reason, Info)]), - ok = handle_dead_node(Node), - {noreply, State}; + {noreply, handle_dead_node(Node, State)}; handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, @@ -333,17 +331,7 @@ code_change(_OldVsn, State, _Extra) -> %% Functions that call the module specific hooks when nodes go up/down %%---------------------------------------------------------------------------- -%% TODO: This may turn out to be a performance hog when there are lots -%% of nodes. We really only need to execute some of these statements -%% on *one* node, rather than all of them. -handle_dead_rabbit(Node) -> - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), - ok = rabbit_alarm:on_node_down(Node), - ok = rabbit_mnesia:on_node_down(Node), - ok. - -handle_dead_node(_Node) -> +handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> %% In general in rabbit_node_monitor we care about whether the %% rabbit application is up rather than the node; we do this so %% that we can respond in the same way to "rabbitmqctl stop_app" @@ -356,17 +344,17 @@ handle_dead_node(_Node) -> case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of - true -> ok; - false -> await_cluster_recovery() + true -> State; + false -> await_cluster_recovery() %% Does not really return end; {ok, ignore} -> - ok; + State; {ok, autoheal} -> - ok; + State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)}; {ok, Term} -> rabbit_log:warning("cluster_partition_handling ~p unrecognised, " "assuming 'ignore'~n", [Term]), - ok + State end. await_cluster_recovery() -> @@ -399,8 +387,15 @@ wait_for_cluster_recovery(Nodes) -> wait_for_cluster_recovery(Nodes) end. -handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, - autoheal = Autoheal}) -> +handle_dead_rabbit(Node, State = #state{partitions = Partitions, + autoheal = Autoheal}) -> + %% TODO: This may turn out to be a performance hog when there are + %% lots of nodes. We really only need to execute some of these + %% statements on *one* node, rather than all of them. + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node), + ok = rabbit_alarm:on_node_down(Node), + ok = rabbit_mnesia:on_node_down(Node), %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -412,7 +407,7 @@ handle_dead_rabbit_state(Node, State = #state{partitions = Partitions, end, ensure_ping_timer( State#state{partitions = Partitions1, - autoheal = rabbit_autoheal:node_down(Node, Autoheal)}). + autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}). ensure_ping_timer(State) -> rabbit_misc:ensure_timer( diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9efb5c96..25bee173 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -45,8 +45,7 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, - blocked_sent}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -249,8 +248,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> throttle = #throttle{ alarmed_by = [], last_blocked_by = none, - last_blocked_at = never, - blocked_sent = false}}, + last_blocked_at = never}}, try run({?MODULE, recvloop, [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( @@ -339,14 +337,19 @@ stop(Reason, State) -> maybe_emit_stats(State), throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, - State = #v1{throttle = Throttle = - #throttle{alarmed_by = CR}}) -> + State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> CR1 = case Conserve of true -> lists:usort([Source | CR]); false -> CR -- [Source] end, - Throttle1 = Throttle#throttle{alarmed_by = CR1}, - control_throttle(State#v1{throttle = Throttle1}); + State1 = control_throttle( + State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}), + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {true, false} -> ok = send_unblocked(State1); + {_, _} -> ok + end, + State1; handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), {_, State1} = channel_cleanup(ChPid, State), @@ -439,10 +442,7 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - maybe_send_unblocked(State), - State#v1{connection_state = running, - throttle = Throttle#throttle{ - blocked_sent = false}}; + State#v1{connection_state = running}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State @@ -451,37 +451,49 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - Sent = maybe_send_blocked(State), - State#v1{connection_state = blocked, - throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now(), - blocked_sent = Sent})}; + State1 = State#v1{connection_state = blocked, + throttle = update_last_blocked_by( + Throttle#throttle{ + last_blocked_at = erlang:now()})}, + case {blocked_by_alarm(State), blocked_by_alarm(State1)} of + {false, true} -> ok = send_blocked(State1); + {_, _} -> ok + end, + State1; maybe_block(State) -> State. -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> - false; -maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, - connection = #connection{ - protocol = Protocol, - capabilities = Capabilities}, - sock = Sock}) -> + +blocked_by_alarm(#v1{connection_state = blocked, + throttle = #throttle{alarmed_by = CR}}) + when CR =/= [] -> + true; +blocked_by_alarm(#v1{}) -> + false. + +send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> RStr = string:join([atom_to_list(A) || A <- CR], " & "), Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, - Protocol), - true; + Protocol); _ -> - false + ok end. -maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> - ok; -maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, - sock = Sock}) -> - ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). +send_unblocked(#v1{connection = #connection{protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol); + _ -> + ok + end. update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> Throttle#throttle{last_blocked_by = flow}; @@ -1163,7 +1175,7 @@ become_1_0(Id, State = #v1{sock = Sock}) -> {rabbit_amqp1_0_reader, init, [Mode, pack_for_1_0(Buf, BufLen, S)]} end, - State = #v1{connection_state = {become, F}} + State#v1{connection_state = {become, F}} end. pack_for_1_0(Buf, BufLen, #v1{parent = Parent, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 321af4ac..995c7319 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -558,10 +558,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount1, persistent_count = PCount1, unconfirmed = UC1 }), - a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of - true -> update_rates(State3); - false -> State3 - end)). + a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -579,12 +576,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - {SeqId, a(reduce_memory_use( - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }))}. + State3 = State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }, + {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -704,11 +701,12 @@ requeue(AckTags, #vqstate { delta = Delta, State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( - State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount }))}. + maybe_update_rates( + State3 #vqstate { delta = Delta1, + q3 = Q3a, + q4 = Q4a, + in_counter = InCounter + MsgCount, + len = Len + MsgCount })))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -755,6 +753,13 @@ set_ram_duration_target( false -> reduce_memory_use(State1) end). +maybe_update_rates(State = #vqstate{ in_counter = InCount, + out_counter = OutCount }) + when InCount + OutCount > ?MSGS_PER_RATE_CALC -> + update_rates(State); +maybe_update_rates(State) -> + State. + update_rates(State = #vqstate{ in_counter = InCount, out_counter = OutCount, ack_in_counter = AckInCount, @@ -796,8 +801,9 @@ ram_duration(State) -> RamAckCount = gb_trees:size(RPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso - AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of + case lists:all(fun (X) -> X < 0.01 end, + [AvgEgressRate, AvgIngressRate, + AvgAckEgressRate, AvgAckIngressRate]) of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -1179,11 +1185,12 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1}}. + {AckTag, maybe_update_rates( + State1 #vqstate {ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len - 1, + persistent_count = PCount1})}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, |