summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:27:48 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:27:48 +0000
commit3f11f949a6fd0dff7dc7373682aa70851613dbd2 (patch)
treec90d7bd31c139405ba245f91fbe01664ea1939fa
parent8326fecaca6f864633c3e98fc15df6a081c112d5 (diff)
parentc03bcb90b5ce1d6035ac627500667f9b51a1e4a8 (diff)
downloadrabbitmq-server-3f11f949a6fd0dff7dc7373682aa70851613dbd2.tar.gz
Merge in default
-rw-r--r--docs/rabbitmq.config.example4
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_autoheal.erl60
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_misc.erl25
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_node_monitor.erl41
-rw-r--r--src/rabbit_reader.erl78
-rw-r--r--src/rabbit_variable_queue.erl51
10 files changed, 166 insertions, 129 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 7d6b80a7..4d323902 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -494,6 +494,10 @@
%%
%% {port, 389},
+ %% LDAP connection timeout, in milliseconds or 'infinity'
+ %%
+ %% {timeout, infinity},
+
%% Enable logging of LDAP queries.
%% One of
%% - false (no logging is performed)
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 25f0a18c..e3202f54 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -861,12 +861,21 @@ prioritise_cast(Msg, _Len, State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {ack, _AckTags, _ChPid} -> 3; %% [1]
+ {resume, _ChPid} -> 2;
{notify_sent, _ChPid, _Credit} -> consumer_bias(State);
- {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+%% [1] It should be safe to always prioritise ack / resume since they
+%% will be rate limited by how fast consumers receive messages -
+%% i.e. by notify_sent. We prioritise ack and resume to discourage
+%% starvation caused by prioritising notify_sent. We don't vary their
+%% prioritiy since acks should stay in order (some parts of the queue
+%% stack are optimised for that) and to make things easier to reason
+%% about. Finally, we prioritise ack over resume since it should
+%% always reduce memory use.
+
consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:msg_rates(BQS) of
{0.0, _} -> 0;
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index a5b91867..3aa32c09 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -16,7 +16,7 @@
-module(rabbit_autoheal).
--export([init/0, maybe_start/1, node_down/2, handle_msg/3]).
+-export([init/0, maybe_start/1, rabbit_down/2, node_down/2, handle_msg/3]).
%% The named process we are running in.
-define(SERVER, rabbit_node_monitor).
@@ -37,10 +37,13 @@
%% selected as the first node in the cluster.
%%
%% To coordinate the restarting nodes we pick a special node from the
-%% winning partition - the "winner". Restarting nodes then stop, tell
-%% the winner they have done so, and wait for it to tell them it is
-%% safe to start again. The winner and the leader are not necessarily
-%% the same node.
+%% winning partition - the "winner". Restarting nodes then stop, and
+%% wait for it to tell them it is safe to start again. The winner
+%% determines that a node has stopped just by seeing if its rabbit app
+%% stops - if a node stops for any other reason it just gets a message
+%% it will ignore, and otherwise we carry on.
+%%
+%% The winner and the leader are not necessarily the same node.
%%
%% Possible states:
%%
@@ -75,10 +78,31 @@ maybe_start(State) ->
enabled() ->
{ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling).
-node_down(_Node, {winner_waiting, _Nodes, _Notify} = Autoheal) ->
- Autoheal;
+
+%% This is the winner receiving its last notification that a node has
+%% stopped - all nodes can now start again
+rabbit_down(Node, {winner_waiting, [Node], Notify}) ->
+ rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
+ notify_safe(Notify),
+ not_healing;
+
+rabbit_down(Node, {winner_waiting, WaitFor, Notify}) ->
+ {winner_waiting, WaitFor -- [Node], Notify};
+
+rabbit_down(_Node, State) ->
+ %% ignore, we already cancelled the autoheal process
+ State.
+
node_down(_Node, not_healing) ->
not_healing;
+
+node_down(Node, {winner_waiting, _, Notify}) ->
+ rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
+ %% Make sure any nodes waiting for us start - it won't necessarily
+ %% heal the partition but at least they won't get stuck.
+ notify_safe(Notify),
+ not_healing;
+
node_down(Node, _State) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
not_healing.
@@ -129,7 +153,6 @@ handle_msg({winner_is, Winner},
fun () ->
MRef = erlang:monitor(process, {?SERVER, Winner}),
rabbit:stop(),
- send(Winner, {node_stopped, node()}),
receive
{'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
autoheal_safe_to_start -> ok
@@ -139,30 +162,17 @@ handle_msg({winner_is, Winner},
end),
restarting;
-%% This is the winner receiving its last notification that a node has
-%% stopped - all nodes can now start again
-handle_msg({node_stopped, Node},
- {winner_waiting, [Node], Notify}, _Partitions) ->
- rabbit_log:info("Autoheal: final node has stopped, starting...~n",[]),
- [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
- not_healing;
-
-handle_msg({node_stopped, Node},
- {winner_waiting, WaitFor, Notify}, _Partitions) ->
- {winner_waiting, WaitFor -- [Node], Notify};
-
handle_msg(_, restarting, _Partitions) ->
%% ignore, we can contribute no further
- restarting;
-
-handle_msg({node_stopped, _Node}, State, _Partitions) ->
- %% ignore, we already cancelled the autoheal process
- State.
+ restarting.
%%----------------------------------------------------------------------------
send(Node, Msg) -> {?SERVER, Node} ! {autoheal_msg, Msg}.
+notify_safe(Notify) ->
+ [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify].
+
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
[[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b272c64f..7bf6bd4a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -139,8 +139,8 @@ sync_mirrors(HandleInfo, EmitStats,
backing_queue = BQ,
backing_queue_state = BQS }) ->
Log = fun (Fmt, Params) ->
- rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n",
- [rabbit_misc:rs(QName) | Params])
+ rabbit_mirror_queue_misc:log_info(
+ QName, "Synchronising ~s: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4f77009c..4e9d5aef 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
- maybe_auto_sync/1]).
+ maybe_auto_sync/1, log_info/3, log_warning/3]).
%% for testing only
-export([module/1]).
@@ -56,6 +56,8 @@
-spec(update_mirrors/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok').
+-spec(log_warning/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok').
-endif.
@@ -156,9 +158,8 @@ drop_mirror(QName, MirrorNode) ->
[QPid] when SPids =:= [] ->
{error, cannot_drop_only_mirror};
[Pid] ->
- rabbit_log:info(
- "Dropping queue mirror on node ~p for ~s~n",
- [MirrorNode, rabbit_misc:rs(Name)]),
+ log_info(Name, "Dropping queue mirror on node ~p~n",
+ [MirrorNode]),
exit(Pid, {shutdown, dropped}),
{ok, dropped}
end;
@@ -192,23 +193,29 @@ start_child(Name, MirrorNode, Q, SyncMode) ->
fun () ->
{ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
- rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ log_info(Name, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
rabbit_mirror_queue_slave:go(SPid, SyncMode)
end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
- rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- case IsMaster of
+ log_info(QueueName, "~s ~s saw deaths of mirrors ~s~n",
+ [case IsMaster of
true -> "Master";
false -> "Slave"
end,
rabbit_misc:pid_to_string(MirrorPid),
[[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args).
+log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args).
+
+log(Level, QName, Fmt, Args) ->
+ rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
+ [rabbit_misc:rs(QName) | Args]).
+
store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
sync_slave_pids = SSPids}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1f31b5c8..37d2e5b6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -136,9 +136,8 @@ handle_go(Q = #amqqueue{name = QName}) ->
rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
{ok, State};
{stale, StalePid} ->
- rabbit_log:warning("Detected stale HA master while adding "
- "mirror of ~s: ~p~n",
- [rabbit_misc:rs(QName), StalePid]),
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Detected stale HA master: ~p~n", [StalePid]),
gm:leave(GM),
{error, {stale_master_pid, StalePid}};
duplicate_live_master ->
@@ -526,8 +525,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
+ rabbit_mirror_queue_misc:log_info(QName, "Promoting slave ~s to master~n",
+ [rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 848c4a87..ab1c6063 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -685,7 +685,7 @@ pid_to_string(Pid) when is_pid(Pid) ->
<<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]).
+ format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
string_to_pid(Str) ->
@@ -695,13 +695,7 @@ string_to_pid(Str) ->
case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, CreStr, IdStr, SerStr]} ->
- %% the NodeStr atom might be quoted, so we have to parse
- %% it rather than doing a simple list_to_atom
- NodeAtom = case erl_scan:string(NodeStr) of
- {ok, [{atom, _, X}], _} -> X;
- {error, _, _} -> throw(Err)
- end,
- <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
+ <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
[Cre, Id, Ser] = lists:map(fun list_to_integer/1,
[CreStr, IdStr, SerStr]),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 488f1df5..46dbd7b7 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -257,9 +257,8 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
- ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
- {noreply, handle_dead_rabbit_state(
+ {noreply, handle_dead_rabbit(
Node,
State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
@@ -270,8 +269,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
handle_info({nodedown, Node, Info}, State) ->
rabbit_log:info("node ~p down: ~p~n",
[Node, proplists:get_value(nodedown_reason, Info)]),
- ok = handle_dead_node(Node),
- {noreply, State};
+ {noreply, handle_dead_node(Node, State)};
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
@@ -333,17 +331,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Functions that call the module specific hooks when nodes go up/down
%%----------------------------------------------------------------------------
-%% TODO: This may turn out to be a performance hog when there are lots
-%% of nodes. We really only need to execute some of these statements
-%% on *one* node, rather than all of them.
-handle_dead_rabbit(Node) ->
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
- ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node),
- ok.
-
-handle_dead_node(_Node) ->
+handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
%% In general in rabbit_node_monitor we care about whether the
%% rabbit application is up rather than the node; we do this so
%% that we can respond in the same way to "rabbitmqctl stop_app"
@@ -356,17 +344,17 @@ handle_dead_node(_Node) ->
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
case majority() of
- true -> ok;
- false -> await_cluster_recovery()
+ true -> State;
+ false -> await_cluster_recovery() %% Does not really return
end;
{ok, ignore} ->
- ok;
+ State;
{ok, autoheal} ->
- ok;
+ State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)};
{ok, Term} ->
rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
"assuming 'ignore'~n", [Term]),
- ok
+ State
end.
await_cluster_recovery() ->
@@ -399,8 +387,15 @@ wait_for_cluster_recovery(Nodes) ->
wait_for_cluster_recovery(Nodes)
end.
-handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
- autoheal = Autoheal}) ->
+handle_dead_rabbit(Node, State = #state{partitions = Partitions,
+ autoheal = Autoheal}) ->
+ %% TODO: This may turn out to be a performance hog when there are
+ %% lots of nodes. We really only need to execute some of these
+ %% statements on *one* node, rather than all of them.
+ ok = rabbit_networking:on_node_down(Node),
+ ok = rabbit_amqqueue:on_node_down(Node),
+ ok = rabbit_alarm:on_node_down(Node),
+ ok = rabbit_mnesia:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -412,7 +407,7 @@ handle_dead_rabbit_state(Node, State = #state{partitions = Partitions,
end,
ensure_ping_timer(
State#state{partitions = Partitions1,
- autoheal = rabbit_autoheal:node_down(Node, Autoheal)}).
+ autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}).
ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9efb5c96..25bee173 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,8 +45,7 @@
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
- blocked_sent}).
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -249,8 +248,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
- last_blocked_at = never,
- blocked_sent = false}},
+ last_blocked_at = never}},
try
run({?MODULE, recvloop,
[Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
@@ -339,14 +337,19 @@ stop(Reason, State) -> maybe_emit_stats(State),
throw({inet_error, Reason}).
handle_other({conserve_resources, Source, Conserve},
- State = #v1{throttle = Throttle =
- #throttle{alarmed_by = CR}}) ->
+ State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
CR1 = case Conserve of
true -> lists:usort([Source | CR]);
false -> CR -- [Source]
end,
- Throttle1 = Throttle#throttle{alarmed_by = CR1},
- control_throttle(State#v1{throttle = Throttle1});
+ State1 = control_throttle(
+ State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}),
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {true, false} -> ok = send_unblocked(State1);
+ {_, _} -> ok
+ end,
+ State1;
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
{_, State1} = channel_cleanup(ChPid, State),
@@ -439,10 +442,7 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- maybe_send_unblocked(State),
- State#v1{connection_state = running,
- throttle = Throttle#throttle{
- blocked_sent = false}};
+ State#v1{connection_state = running};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
@@ -451,37 +451,49 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
maybe_block(State = #v1{connection_state = blocking,
throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- Sent = maybe_send_blocked(State),
- State#v1{connection_state = blocked,
- throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now(),
- blocked_sent = Sent})};
+ State1 = State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{
+ last_blocked_at = erlang:now()})},
+ case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
+ {false, true} -> ok = send_blocked(State1);
+ {_, _} -> ok
+ end,
+ State1;
maybe_block(State) ->
State.
-maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
- false;
-maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
- connection = #connection{
- protocol = Protocol,
- capabilities = Capabilities},
- sock = Sock}) ->
+
+blocked_by_alarm(#v1{connection_state = blocked,
+ throttle = #throttle{alarmed_by = CR}})
+ when CR =/= [] ->
+ true;
+blocked_by_alarm(#v1{}) ->
+ false.
+
+send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
{bool, true} ->
RStr = string:join([atom_to_list(A) || A <- CR], " & "),
Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
- Protocol),
- true;
+ Protocol);
_ ->
- false
+ ok
end.
-maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
- ok;
-maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
- sock = Sock}) ->
- ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+send_unblocked(#v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
+ _ ->
+ ok
+ end.
update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
Throttle#throttle{last_blocked_by = flow};
@@ -1163,7 +1175,7 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
{rabbit_amqp1_0_reader, init,
[Mode, pack_for_1_0(Buf, BufLen, S)]}
end,
- State = #v1{connection_state = {become, F}}
+ State#v1{connection_state = {become, F}}
end.
pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 321af4ac..995c7319 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -558,10 +558,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount1,
persistent_count = PCount1,
unconfirmed = UC1 }),
- a(reduce_memory_use(case InCount1 > ?MSGS_PER_RATE_CALC of
- true -> update_rates(State3);
- false -> State3
- end)).
+ a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -579,12 +576,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- {SeqId, a(reduce_memory_use(
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }))}.
+ State3 = State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 },
+ {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -704,11 +701,12 @@ requeue(AckTags, #vqstate { delta = Delta,
State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
- State3 #vqstate { delta = Delta1,
- q3 = Q3a,
- q4 = Q4a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount }))}.
+ maybe_update_rates(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))}.
ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
@@ -755,6 +753,13 @@ set_ram_duration_target(
false -> reduce_memory_use(State1)
end).
+maybe_update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount })
+ when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
+ update_rates(State);
+maybe_update_rates(State) ->
+ State.
+
update_rates(State = #vqstate{ in_counter = InCount,
out_counter = OutCount,
ack_in_counter = AckInCount,
@@ -796,8 +801,9 @@ ram_duration(State) ->
RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
+ case lists:all(fun (X) -> X < 0.01 end,
+ [AvgEgressRate, AvgIngressRate,
+ AvgAckEgressRate, AvgAckIngressRate]) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -1179,11 +1185,12 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1}}.
+ {AckTag, maybe_update_rates(
+ State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1})}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,