diff options
-rw-r--r-- | src/rabbit_channel.erl | 88 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 31 |
2 files changed, 30 insertions, 89 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c4ff361d..1928e21d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,11 +36,9 @@ -behaviour(gen_server2). -export([start_link/6, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([flow_timeout/2]). - -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -48,12 +46,9 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, flow}). - --record(flow, {server, client, pending}). + consumer_mapping, blocking, queue_collector_pid}). -define(MAX_PERMISSION_CACHE_SIZE, 12). --define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -87,9 +82,7 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). @@ -120,15 +113,9 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). -conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). - flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -flow_timeout(Pid, Ref) -> - gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). - list() -> pg_local:get_members(rabbit_channels). @@ -170,9 +157,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}}, + queue_collector_pid = CollectorPid}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -223,27 +208,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - noreply(State1#ch{next_tag = DeliveryTag + 1}); - -handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> - noreply(State); -handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), - noreply(State#ch{state = running}); -handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> - flow_control(not Conserve, State); -handle_cast({conserve_memory, _Conserve}, State) -> - noreply(State); - -handle_cast({flow_timeout, Ref}, - State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, normal, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; -handle_cast({flow_timeout, _Ref}, State) -> - {noreply, State}. + noreply(State1#ch{next_tag = DeliveryTag + 1}). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -383,10 +348,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of - true -> {noreply, State}; - false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} - end; + {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -403,10 +365,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> - rabbit_misc:protocol_error( - command_invalid, - "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -863,48 +821,12 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Active, client = Flow, - pending = {_Ref, TRef}} = F}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Flow, client = Flow, - pending = {_Ref, TRef}}}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, issue_flow(Flow, State)}; -handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> - rabbit_misc:protocol_error( - command_invalid, "unsolicited channel.flow_ok", []); -handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> - rabbit_misc:protocol_error( - command_invalid, - "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); - handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- -flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) - when Flow =:= not Active -> - ok = clear_permission_cache(), - noreply(issue_flow(Active, State)); -flow_control(Active, State = #ch{flow = F}) -> - noreply(State#ch{flow = F#flow{server = Active}}). - -issue_flow(Active, State) -> - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = Active}), - Ref = make_ref(), - {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, - [self(), Ref]), - State#ch{flow = #flow{server = Active, client = not Active, - pending = {Ref, TRef}}}. - binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9603faf5..46171aec 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,7 +39,7 @@ -export([init/1, mainloop/3]). --export([server_properties/0]). +-export([conserve_memory/2, server_properties/0]). -export([analyze_frame/3]). @@ -58,7 +58,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector}). + queue_collector, conserving_memory}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -142,6 +142,7 @@ -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -endif. @@ -208,6 +209,10 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), @@ -295,6 +300,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); + {conserve_memory, Conserve} -> + mainloop(Parent, Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -348,11 +355,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(OldState, NewCallback, Length) -> +switch_callback(State = #v1{conserving_memory = true}, Callback, Length) -> + %% TODO: pause heartbeat monitor + %% TODO: only do this after receiving a content-bearing method + State#v1{callback = {Callback, Length}, recv_ref = none}; +switch_callback(State, Callback, Length) -> Ref = inet_op(fun () -> rabbit_net:async_recv( - OldState#v1.sock, Length, infinity) end), - OldState#v1{callback = NewCallback, - recv_ref = Ref}. + State#v1.sock, Length, infinity) end), + State#v1{callback = Callback, recv_ref = Ref}. terminate(Explanation, State = #v1{connection_state = running}) -> {normal, send_exception(State, 0, @@ -361,6 +371,14 @@ terminate(Explanation, State = #v1{connection_state = running}) -> terminate(_Explanation, State) -> {force, State}. +internal_conserve_memory(false, State = #v1{conserving_memory = true, + callback = {Callback, Length}, + recv_ref = none}) -> + %% TODO: resume heartbeat monitor + switch_callback(State#v1{conserving_memory = false}, Callback, Length); +internal_conserve_memory(Conserve, State) -> + State#v1{conserving_memory = Conserve}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but @@ -670,6 +688,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, connection = NewConnection}; handle_method0(#'connection.close'{}, |