summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_event.erl5
-rw-r--r--src/rabbit_reader.erl17
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl2
7 files changed, 22 insertions, 16 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1fab7e4d..41c628a0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -90,9 +90,9 @@
{enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_event,
- [{description, "statistics event handler"},
+ [{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
- [gen_event, [{local, rabbit_event}]]}},
+ [rabbit_event]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f23fe016..b55d5b21 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -355,7 +355,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-emit_stats(#amqqueue{pid = QPid}) ->
+emit_stats(#amqqueue{pid = QPid}) ->
delegate_pcast(QPid, 7, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5e1b1f71..d52660c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -241,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-
+
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
@@ -887,7 +887,7 @@ handle_cast(maybe_expire, State) ->
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-
+
handle_cast(emit_stats, State) ->
emit_stats(State),
noreply(State).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0639b396..113ffcb4 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -33,6 +33,7 @@
-include("rabbit.hrl").
+-export([start_link/0]).
-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
-export([stats_level/1]).
@@ -68,6 +69,7 @@
-type(timer_fun() :: fun (() -> 'ok')).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())).
-spec(init_stats_timer/0 :: () -> state()).
-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
@@ -80,6 +82,9 @@
%%----------------------------------------------------------------------------
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 532572fd..4b612f2a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -380,8 +380,14 @@ terminate(Explanation, State = #v1{connection_state = running}) ->
terminate(_Explanation, State) ->
{force, State}.
-close_connection(State = #v1{connection = #connection{
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -457,18 +463,13 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector,
connection = #connection{protocol = Protocol},
sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
+ NewState = close_connection(State),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- close_connection(State);
+ NewState;
_ -> State
end;
maybe_close(State) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d50b9f31..ec049a1a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8d9e4ae4..97960571 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1912,7 +1912,7 @@ with_fresh_variable_queue(Fun) ->
{len, 0}]),
_ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
passed.
-
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,