diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 14:30:18 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 14:30:18 +0000 |
commit | ca3b2136fabc41aabc5b3f712bfeecd16103e33d (patch) | |
tree | 81a2c5ec2dfb3b617a676b84f8ffe1867236622e /src/rabbit_channel.erl | |
parent | 42a988dbb6f3ef49b6c6d0683bf75b225b4cdb0b (diff) | |
download | rabbitmq-server-ca3b2136fabc41aabc5b3f712bfeecd16103e33d.tar.gz |
Made the very generic invoke call into a slightly more specific flush call
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r-- | src/rabbit_channel.erl | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e20936..766dbdbe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -78,6 +78,7 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +114,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). + list() -> pg_local:get_members(rabbit_channels). @@ -192,7 +196,7 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast({from_queue, QPid}, State) -> +handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> @@ -825,12 +829,9 @@ handle_method(#'channel.flow'{active = false}, _, false -> LimiterPid end, ok = rabbit_limiter:block(LimiterPid1), - Me = self(), - Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end, - Queues = [begin MRef = erlang:monitor(process, QPid), - rabbit_amqqueue:invoke(QPid, Fun), - {QPid, MRef} - end || QPid <- consumer_queues(Consumers)], + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(self(), QPids), case Queues =:= [] of true -> {reply, #'channel.flow_ok'{active = false}, State}; false -> {noreply, State#ch{limiter_pid = LimiterPid1, |