summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-10-16 05:45:08 +0100
committerMatthias Radestock <matthias@lshift.net>2008-10-16 05:45:08 +0100
commitb656422c2f7d2fdfdc35f97fd1c30c91203749c5 (patch)
tree76696b889ee6c7494c99b629aa6a3e6c0d1029f8
parenteb13eb2bea603a0a1ba058f881c42052f323b163 (diff)
downloadrabbitmq-server-b656422c2f7d2fdfdc35f97fd1c30c91203749c5.tar.gz
wire up channels to rabbit_alarm
We propagate changes in the high memory alarm status as channel.flow messages to the client. The channel.flow_ok replies are simply accepted.
-rw-r--r--src/rabbit_channel.erl18
1 files changed, 17 insertions, 1 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5cc07aed..edfd1787 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -28,7 +28,7 @@
-include("rabbit.hrl").
-export([start_link/4, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4]).
+-export([send_command/2, deliver/4, conserve_memory/2]).
%% callbacks
-export([init/2, handle_message/2]).
@@ -49,6 +49,7 @@
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-endif.
@@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
Pid ! {deliver, ConsumerTag, AckRequired, Msg},
ok.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
%%---------------------------------------------------------------------------
init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
+ %% this is bypassing the proxy so alarms can "jump the queue" and
+ %% be handled promptly
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
#ch{state = starting,
proxy_pid = ProxyPid,
reader_pid = ReaderPid,
@@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg},
true, ConsumerTag, DeliveryTag, Msg),
State1#ch{next_tag = DeliveryTag + 1};
+handle_message({conserve_memory, Conserve}, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
+ State;
+
handle_message({'EXIT', _Pid, Reason}, State) ->
terminate(Reason, State);
@@ -630,6 +643,9 @@ handle_method(#'channel.flow'{active = _}, _, State) ->
%% FIXME: implement
{reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow_ok'{active = _}, _, State) ->
+ {noreply, State};
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).