diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-01 15:15:22 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-01 15:15:22 +0100 |
commit | bbccf25f6ee73eacd8bee60a1eaec6b520bb1f69 (patch) | |
tree | 2aeb668bdb07cf717f8e90fb5c2b84cb930cda6f | |
parent | 4bd2badfe32e1ba403843313428aa296421de189 (diff) | |
download | rabbitmq-server-bbccf25f6ee73eacd8bee60a1eaec6b520bb1f69.tar.gz |
Delay the issuing of channel.open_ok if under memory pressure. Also revert changes to tcp_acceptor. Also add test
-rw-r--r-- | src/rabbit_alarm.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 25 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 21 |
4 files changed, 48 insertions, 28 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7e96d9a3..53c713e6 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -47,7 +47,7 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). -endif. @@ -67,9 +67,9 @@ stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). register(Pid, HighMemMFA) -> - ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, - infinity). + gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA}, false -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, ok, State#alarms{alertees = NewAlertees}}; + {ok, State#alarms.vm_memory_high_watermark, + State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d53711e8..d34c5a72 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -220,8 +220,17 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State = #ch{state = starting}) -> + case Conserve of + true -> noreply(State); + false -> ok = rabbit_writer:send_command(State#ch.writer_pid, + #'channel.open_ok'{}), + noreply(State#ch{state = running}) + end; +handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> flow_control(not Conserve, State); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State); handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> @@ -392,8 +401,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of + true -> {noreply, State}; + false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} + end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -412,7 +423,7 @@ handle_method(#'access.request'{},_, State) -> handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> rabbit_misc:protocol_error( - precondition_failed, + command_invalid, "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 05efdcac..8f40d3fa 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -957,6 +957,31 @@ test_memory_pressure() -> throw(channel_failed_to_exit) end, + alarm_handler:set_alarm({vm_memory_high_watermark, []}), + Me = self(), + Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch4, #'channel.open'{}), + MRef4 = erlang:monitor(process, Ch4), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) + after 0 -> ok + end, + alarm_handler:clear_alarm(vm_memory_high_watermark), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + rabbit_channel:shutdown(Ch4), + receive {'DOWN', MRef4, process, Ch4, normal} -> + ok + after 1000 -> + throw(channel_failed_to_exit) + end, + passed. test_delegates_async(SecondaryNode) -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 10efee42..cc4982c9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -38,25 +38,18 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([conserve_memory/2]). - --record(state, {callback, sock, ref, conserving, needs_accept}). +-record(state, {callback, sock, ref}). %%-------------------------------------------------------------------- start_link(Callback, LSock) -> gen_server:start_link(?MODULE, {Callback, LSock}, []). -conserve_memory(Pid, Conserve) -> - gen_server:cast(Pid, {conserve_memory, Conserve}). - %%-------------------------------------------------------------------- init({Callback, LSock}) -> gen_server:cast(self(), accept), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {ok, #state{callback=Callback, sock=LSock, conserving = false, - needs_accept = false}}. + {ok, #state{callback=Callback, sock=LSock}}. handle_call(_Request, _From, State) -> {noreply, State}. @@ -64,14 +57,6 @@ handle_call(_Request, _From, State) -> handle_cast(accept, State) -> accept(State); -handle_cast({conserve_memory, Conserve}, State) -> - State1 = case State#state.needs_accept andalso not Conserve of - true -> gen_server:cast(self(), accept), - State#state{needs_accept = false}; - false -> State - end, - {noreply, State1#state{conserving = Conserve}}; - handle_cast(_Msg, State) -> {noreply, State}. @@ -125,8 +110,6 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -accept(State = #state{conserving = true}) -> - {noreply, State#state{needs_accept = true}}; accept(State = #state{sock=LSock}) -> ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of |