diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-10-16 05:45:08 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-10-16 05:45:08 +0100 |
commit | b656422c2f7d2fdfdc35f97fd1c30c91203749c5 (patch) | |
tree | 76696b889ee6c7494c99b629aa6a3e6c0d1029f8 | |
parent | eb13eb2bea603a0a1ba058f881c42052f323b163 (diff) | |
download | rabbitmq-server-b656422c2f7d2fdfdc35f97fd1c30c91203749c5.tar.gz |
wire up channels to rabbit_alarm
We propagate changes in the high memory alarm status as channel.flow
messages to the client. The channel.flow_ok replies are simply
accepted.
-rw-r--r-- | src/rabbit_channel.erl | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5cc07aed..edfd1787 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -export([start_link/4, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4]). +-export([send_command/2, deliver/4, conserve_memory/2]). %% callbacks -export([init/2, handle_message/2]). @@ -49,6 +49,7 @@ -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> Pid ! {deliver, ConsumerTag, AckRequired, Msg}, ok. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + %%--------------------------------------------------------------------------- init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), + %% this is bypassing the proxy so alarms can "jump the queue" and + %% be handled promptly + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), #ch{state = starting, proxy_pid = ProxyPid, reader_pid = ReaderPid, @@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg}, true, ConsumerTag, DeliveryTag, Msg), State1#ch{next_tag = DeliveryTag + 1}; +handle_message({conserve_memory, Conserve}, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -630,6 +643,9 @@ handle_method(#'channel.flow'{active = _}, _, State) -> %% FIXME: implement {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow_ok'{active = _}, _, State) -> + {noreply, State}; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). |