summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-03 14:30:18 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-03 14:30:18 +0000
commitca3b2136fabc41aabc5b3f712bfeecd16103e33d (patch)
tree81a2c5ec2dfb3b617a676b84f8ffe1867236622e
parent42a988dbb6f3ef49b6c6d0683bf75b225b4cdb0b (diff)
downloadrabbitmq-server-ca3b2136fabc41aabc5b3f712bfeecd16103e33d.tar.gz
Made the very generic invoke call into a slightly more specific flush call
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl17
3 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 285445f2..4abd0cad 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, invoke/2]).
+-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -107,7 +107,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(invoke/2 :: (pid(), (fun ((pid()) -> any()))) -> 'ok').
+-spec(flush_all/2 :: (pid(), [pid()]) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -335,8 +335,11 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
-invoke(QPid, Fun) ->
- gen_server2:cast(QPid, {invoke, Fun}).
+flush_all(ChPid, QPids) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
+ QPids).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 26914867..19cb5c71 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -828,8 +828,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end));
-handle_cast({invoke, Fun}, State) ->
- Fun(self()),
+handle_cast({flush, ChPid}, State) ->
+ ok = rabbit_channel:flushed(ChPid, self()),
noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e20936..766dbdbe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/5, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2]).
+-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3,
@@ -78,6 +78,7 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +114,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
+flushed(Pid, QPid) ->
+ gen_server2:cast(Pid, {flushed, QPid}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -192,7 +196,7 @@ handle_cast({method, Method, Content}, State) ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({from_queue, QPid}, State) ->
+handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
handle_cast(terminate, State) ->
@@ -825,12 +829,9 @@ handle_method(#'channel.flow'{active = false}, _,
false -> LimiterPid
end,
ok = rabbit_limiter:block(LimiterPid1),
- Me = self(),
- Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end,
- Queues = [begin MRef = erlang:monitor(process, QPid),
- rabbit_amqqueue:invoke(QPid, Fun),
- {QPid, MRef}
- end || QPid <- consumer_queues(Consumers)],
+ QPids = consumer_queues(Consumers),
+ Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(self(), QPids),
case Queues =:= [] of
true -> {reply, #'channel.flow_ok'{active = false}, State};
false -> {noreply, State#ch{limiter_pid = LimiterPid1,