diff options
authorMatthias Radestock <>2010-08-02 10:45:59 +0100
committerMatthias Radestock <>2010-08-02 10:45:59 +0100
commit6a555a83053ef506844f53f850b66a217e05be1d (patch)
parent48228327e73d965cdeb57549ae33f5b371bf554a (diff)
parent34db93f494f6b2d91d077001352b0d7ce5b68297 (diff)
merge default into bug23027
2 files changed, 30 insertions, 89 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4ff361d..1928e21d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,11 +36,9 @@
-export([start_link/6, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -48,12 +46,9 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
--record(flow, {server, client, pending}).
+ consumer_mapping, blocking, queue_collector_pid}).
--define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
@@ -87,9 +82,7 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
@@ -120,15 +113,9 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-flow_timeout(Pid, Ref) ->
- gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
list() ->
@@ -170,9 +157,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
+ queue_collector_pid = CollectorPid},
@@ -223,27 +208,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
- noreply(State);
-handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
- noreply(State#ch{state = running});
-handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
- flow_control(not Conserve, State);
-handle_cast({conserve_memory, _Conserve}, State) ->
- noreply(State);
-handle_cast({flow_timeout, Ref},
- State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
-handle_cast({flow_timeout, _Ref}, State) ->
- {noreply, State}.
+ noreply(State1#ch{next_tag = DeliveryTag + 1}).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -383,10 +348,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
handle_method(#''{}, _, State = #ch{state = starting}) ->
- case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
- true -> {noreply, State};
- false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
- end;
+ {reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#''{}, _, _State) ->
@@ -403,10 +365,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -863,48 +821,12 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}} = F})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Flow, client = Flow,
- pending = {_Ref, TRef}}})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(Flow, State)};
-handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
- rabbit_misc:protocol_error(
- command_invalid, "unsolicited channel.flow_ok", []);
-handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
command_invalid, "unimplemented method", []).
-flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
- when Flow =:= not Active ->
- ok = clear_permission_cache(),
- noreply(issue_flow(Active, State));
-flow_control(Active, State = #ch{flow = F}) ->
- noreply(State#ch{flow = F#flow{server = Active}}).
-issue_flow(Active, State) ->
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = Active}),
- Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- State#ch{flow = #flow{server = Active, client = not Active,
- pending = {Ref, TRef}}}.
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9603faf5..46171aec 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,7 +39,7 @@
-export([init/1, mainloop/3]).
+-export([conserve_memory/2, server_properties/0]).
@@ -58,7 +58,7 @@
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+ queue_collector, conserving_memory}).
[pid, address, port, peer_address, peer_port,
@@ -142,6 +142,7 @@
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
@@ -208,6 +209,10 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -295,6 +300,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -348,11 +355,14 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{conserving_memory = true}, Callback, Length) ->
+ %% TODO: pause heartbeat monitor
+ %% TODO: only do this after receiving a content-bearing method
+ State#v1{callback = {Callback, Length}, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_ref = Ref}.
terminate(Explanation, State = #v1{connection_state = running}) ->
{normal, send_exception(State, 0,
@@ -361,6 +371,14 @@ terminate(Explanation, State = #v1{connection_state = running}) ->
terminate(_Explanation, State) ->
{force, State}.
+internal_conserve_memory(false, State = #v1{conserving_memory = true,
+ callback = {Callback, Length},
+ recv_ref = none}) ->
+ %% TODO: resume heartbeat monitor
+ switch_callback(State#v1{conserving_memory = false}, Callback, Length);
+internal_conserve_memory(Conserve, State) ->
+ State#v1{conserving_memory = Conserve}.
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but
@@ -670,6 +688,7 @@ handle_method0(#''{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection};