diff options
-rw-r--r-- | src/rabbit.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_event.erl | 5 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 17 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
7 files changed, 22 insertions, 16 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 1fab7e4d..41c628a0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -90,9 +90,9 @@ {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_event, - [{description, "statistics event handler"}, + [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, - [gen_event, [{local, rabbit_event}]]}}, + [rabbit_event]}}, {requires, external_infrastructure}, {enables, kernel_ready}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f23fe016..b55d5b21 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -355,7 +355,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -emit_stats(#amqqueue{pid = QPid}) -> +emit_stats(#amqqueue{pid = QPid}) -> delegate_pcast(QPid, 7, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5e1b1f71..d52660c5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -241,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. - + stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> @@ -887,7 +887,7 @@ handle_cast(maybe_expire, State) -> {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; - + handle_cast(emit_stats, State) -> emit_stats(State), noreply(State). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 0639b396..113ffcb4 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -33,6 +33,7 @@ -include("rabbit.hrl"). +-export([start_link/0]). -export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). -export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). -export([stats_level/1]). @@ -68,6 +69,7 @@ -type(timer_fun() :: fun (() -> 'ok')). +-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())). -spec(init_stats_timer/0 :: () -> state()). -spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). -spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). @@ -80,6 +82,9 @@ %%---------------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?MODULE}). + init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), #state{level = StatsLevel, timer = undefined}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 532572fd..4b612f2a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -380,8 +380,14 @@ terminate(Explanation, State = #v1{connection_state = running}) -> terminate(_Explanation, State) -> {force, State}. -close_connection(State = #v1{connection = #connection{ +close_connection(State = #v1{queue_collector = Collector, + connection = #connection{ timeout_sec = TimeoutSec}}) -> + %% The spec says "Exclusive queues may only be accessed by the + %% current connection, and are deleted when that connection + %% closes." This does not strictly imply synchrony, but in + %% practice it seems to be what people assume. + rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. TimeoutMillisec = @@ -457,18 +463,13 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector, connection = #connection{protocol = Protocol}, sock = Sock}) -> case all_channels() of [] -> - %% Spec says "Exclusive queues may only be accessed by the current - %% connection, and are deleted when that connection closes." - %% This does not strictly imply synchrony, but in practice it seems - %% to be what people assume. - rabbit_queue_collector:delete_all(Collector), + NewState = close_connection(State), ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), - close_connection(State); + NewState; _ -> State end; maybe_close(State) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d50b9f31..ec049a1a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false, deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8d9e4ae4..97960571 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1912,7 +1912,7 @@ with_fresh_variable_queue(Fun) -> {len, 0}]), _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), passed. - + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, |