summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-01 15:15:22 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-01 15:15:22 +0100
commitbbccf25f6ee73eacd8bee60a1eaec6b520bb1f69 (patch)
tree2aeb668bdb07cf717f8e90fb5c2b84cb930cda6f
parent4bd2badfe32e1ba403843313428aa296421de189 (diff)
downloadrabbitmq-server-bbccf25f6ee73eacd8bee60a1eaec6b520bb1f69.tar.gz
Delay the issuing of channel.open_ok if under memory pressure. Also revert changes to tcp_acceptor. Also add test
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_tests.erl25
-rw-r--r--src/tcp_acceptor.erl21
4 files changed, 48 insertions, 28 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7e96d9a3..53c713e6 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -47,7 +47,7 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
-endif.
@@ -67,9 +67,9 @@ stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
register(Pid, HighMemMFA) ->
- ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
- infinity).
+ gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA},
false -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, ok, State#alarms{alertees = NewAlertees}};
+ {ok, State#alarms.vm_memory_high_watermark,
+ State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d53711e8..d34c5a72 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -220,8 +220,17 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State = #ch{state = starting}) ->
+ case Conserve of
+ true -> noreply(State);
+ false -> ok = rabbit_writer:send_command(State#ch.writer_pid,
+ #'channel.open_ok'{}),
+ noreply(State#ch{state = running})
+ end;
+handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
flow_control(not Conserve, State);
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State);
handle_cast({flow_timeout, Ref},
State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
@@ -392,8 +401,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
+ true -> {noreply, State};
+ false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
+ end;
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -412,7 +423,7 @@ handle_method(#'access.request'{},_, State) ->
handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
rabbit_misc:protocol_error(
- precondition_failed,
+ command_invalid,
"basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 05efdcac..8f40d3fa 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -957,6 +957,31 @@ test_memory_pressure() ->
throw(channel_failed_to_exit)
end,
+ alarm_handler:set_alarm({vm_memory_high_watermark, []}),
+ Me = self(),
+ Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch4, #'channel.open'{}),
+ MRef4 = erlang:monitor(process, Ch4),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
+ after 0 -> ok
+ end,
+ alarm_handler:clear_alarm(vm_memory_high_watermark),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ rabbit_channel:shutdown(Ch4),
+ receive {'DOWN', MRef4, process, Ch4, normal} ->
+ ok
+ after 1000 ->
+ throw(channel_failed_to_exit)
+ end,
+
passed.
test_delegates_async(SecondaryNode) ->
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 10efee42..cc4982c9 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -38,25 +38,18 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([conserve_memory/2]).
-
--record(state, {callback, sock, ref, conserving, needs_accept}).
+-record(state, {callback, sock, ref}).
%%--------------------------------------------------------------------
start_link(Callback, LSock) ->
gen_server:start_link(?MODULE, {Callback, LSock}, []).
-conserve_memory(Pid, Conserve) ->
- gen_server:cast(Pid, {conserve_memory, Conserve}).
-
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
gen_server:cast(self(), accept),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {ok, #state{callback=Callback, sock=LSock, conserving = false,
- needs_accept = false}}.
+ {ok, #state{callback=Callback, sock=LSock}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -64,14 +57,6 @@ handle_call(_Request, _From, State) ->
handle_cast(accept, State) ->
accept(State);
-handle_cast({conserve_memory, Conserve}, State) ->
- State1 = case State#state.needs_accept andalso not Conserve of
- true -> gen_server:cast(self(), accept),
- State#state{needs_accept = false};
- false -> State
- end,
- {noreply, State1#state{conserving = Conserve}};
-
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -125,8 +110,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-accept(State = #state{conserving = true}) ->
- {noreply, State#state{needs_accept = true}};
accept(State = #state{sock=LSock}) ->
ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of