summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 14:30:18 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 14:30:18 +0000
commitca3b2136fabc41aabc5b3f712bfeecd16103e33d (patch)
tree81a2c5ec2dfb3b617a676b84f8ffe1867236622e /src/rabbit_channel.erl
parent42a988dbb6f3ef49b6c6d0683bf75b225b4cdb0b (diff)
downloadrabbitmq-server-ca3b2136fabc41aabc5b3f712bfeecd16103e33d.tar.gz
Made the very generic invoke call into a slightly more specific flush call
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e20936..766dbdbe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/5, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2]).
+-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3,
@@ -78,6 +78,7 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +114,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
+flushed(Pid, QPid) ->
+ gen_server2:cast(Pid, {flushed, QPid}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -192,7 +196,7 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({from_queue, QPid}, State) ->
+handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
handle_cast(terminate, State) ->
@@ -825,12 +829,9 @@ handle_method(#'channel.flow'{active = false}, _,
false -> LimiterPid
end,
ok = rabbit_limiter:block(LimiterPid1),
- Me = self(),
- Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end,
- Queues = [begin MRef = erlang:monitor(process, QPid),
- rabbit_amqqueue:invoke(QPid, Fun),
- {QPid, MRef}
- end || QPid <- consumer_queues(Consumers)],
+ QPids = consumer_queues(Consumers),
+ Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(self(), QPids),
case Queues =:= [] of
true -> {reply, #'channel.flow_ok'{active = false}, State};
false -> {noreply, State#ch{limiter_pid = LimiterPid1,