diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 14:30:18 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 14:30:18 +0000 |
commit | ca3b2136fabc41aabc5b3f712bfeecd16103e33d (patch) | |
tree | 81a2c5ec2dfb3b617a676b84f8ffe1867236622e | |
parent | 42a988dbb6f3ef49b6c6d0683bf75b225b4cdb0b (diff) | |
download | rabbitmq-server-ca3b2136fabc41aabc5b3f712bfeecd16103e33d.tar.gz |
Made the very generic invoke call into a slightly more specific flush call
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 |
3 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 285445f2..4abd0cad 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, invoke/2]). +-export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -107,7 +107,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(invoke/2 :: (pid(), (fun ((pid()) -> any()))) -> 'ok'). +-spec(flush_all/2 :: (pid(), [pid()]) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -335,8 +335,11 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). -invoke(QPid, Fun) -> - gen_server2:cast(QPid, {invoke, Fun}). +flush_all(ChPid, QPids) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, + QPids). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 26914867..19cb5c71 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -828,8 +828,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); -handle_cast({invoke, Fun}, State) -> - Fun(self()), +handle_cast({flush, ChPid}, State) -> + ok = rabbit_channel:flushed(ChPid, self()), noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e20936..766dbdbe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -78,6 +78,7 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +114,9 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). + list() -> pg_local:get_members(rabbit_channels). @@ -192,7 +196,7 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast({from_queue, QPid}, State) -> +handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> @@ -825,12 +829,9 @@ handle_method(#'channel.flow'{active = false}, _, false -> LimiterPid end, ok = rabbit_limiter:block(LimiterPid1), - Me = self(), - Fun = fun(QPid) -> gen_server2:cast(Me, {from_queue, QPid}) end, - Queues = [begin MRef = erlang:monitor(process, QPid), - rabbit_amqqueue:invoke(QPid, Fun), - {QPid, MRef} - end || QPid <- consumer_queues(Consumers)], + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(self(), QPids), case Queues =:= [] of true -> {reply, #'channel.flow_ok'{active = false}, State}; false -> {noreply, State#ch{limiter_pid = LimiterPid1, |