summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-10-21 15:59:41 +0100
committerMatthias Radestock <matthias@lshift.net>2008-10-21 15:59:41 +0100
commit324972e9e3658d6631358f0906c3d72afe470ae2 (patch)
tree7f0aa853e4c1760b6feac01a42cc8083b80e31d6
parentf78b288923dcbaefa12c7100a9a4adcfe1fd0c3d (diff)
downloadrabbitmq-server-bug19468_channels.tar.gz
hibernate some processes to conserve memorybug19468_channels
In my experiments I encountered situations where rabbit would not recover from a high memory alert even though all messages had been drained from it. By inspecting the running processes I determined that queue and channel processes sometimes hung on to garbage. Erlang's gc is per-process and triggered by process reduction counts, which means an idle process will never perform a gc. This explains the behaviour - the publisher channel goes idle when channel flow control is activated and the queue process goes idle once all messages have been drained from it. Hibernating idle processes forces a gc, as well as generally reducing memory consumption. Currently only channel and queue processes are hibernating, since these are the only two that seemed to be causing problems in my tests. We may want to extend hibernation to other processes in the future.
-rw-r--r--src/buffering_proxy.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl78
2 files changed, 49 insertions, 34 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
index dc168608..7707e636 100644
--- a/src/buffering_proxy.erl
+++ b/src/buffering_proxy.erl
@@ -32,6 +32,8 @@
-export([mainloop/4, drain/2]).
-export([proxy_loop/3]).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
start_link(M, A) ->
@@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) ->
ProxyPid ! Ref,
NewSt;
Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
end,
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef16..e687df84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER, 1000).
-export([start_link/1]).
@@ -75,7 +76,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
@@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
end,
round_robin = NewActive})) of
{continue, NewState} ->
- {noreply, NewState};
+ noreply(NewState);
{stop, NewState} ->
{stop, normal, NewState}
end
@@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% queues discarding the message?
%%
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
@@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
gen_server:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
- {noreply, NewState};
+ noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
persist_auto_ack(QName, Message)
end,
Msg = {QName, self(), NextId, Delivered, Message},
- {reply, {ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
+ reply({ok, queue:len(BufferTail), Msg},
+ State#q{message_buffer = BufferTail,
+ next_msg_id = NextId + 1});
{empty, _} ->
- {reply, empty, State}
+ reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
@@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
- {reply, {error, queue_owned_by_another_connection}, State};
+ reply({error, queue_owned_by_another_connection}, State);
ok ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
- {reply, {error, exclusive_consume_unavailable}, State};
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
@@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, run_poke_burst(State1)}
+ reply(ok, run_poke_burst(State1))
end
end;
@@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, State};
+ reply(ok, State);
C = #cr{consumers = Consumers} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
@@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ConsumerTag,
RoundRobin)}) of
{continue, State1} ->
- {reply, ok, State1};
+ reply(ok, State1);
{stop, State1} ->
{stop, normal, ok, State1}
end
@@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
- {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
- {reply, {error, not_empty}, State};
+ reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
- {reply, {error, in_use}, State};
+ reply({error, in_use}, State);
true ->
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
end;
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
- {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
+ reply({ok, queue:len(MessageBuffer)},
+ State#q{message_buffer = queue:new()});
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% to check, we'd need to hold not just the ch
%% pid for each consumer, but also its reader
%% pid...
- {reply, locked, State};
+ reply(locked, State);
ok ->
- {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
+ reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
end;
{ReaderPid, _MonitorRef} ->
- {reply, ok, State};
+ reply(ok, State);
_ ->
- {reply, locked, State}
+ reply(locked, State)
end.
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {noreply, NewState};
+ noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
@@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
_ ->
record_pending_acks(Txn, ChPid, MsgIds)
end,
- {noreply, State}
+ noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
- {noreply, State};
+ noreply(State);
handle_cast({redeliver, Messages}, State) ->
- {noreply, deliver_or_enqueue_n(Messages, State)};
+ noreply(deliver_or_enqueue_n(Messages, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {noreply, deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State)}
+ noreply(deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
end;
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
T = #cr{unsent_message_count =Count} ->
- {noreply, possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State)}
+ noreply(possibly_unblock(
+ T#cr{unsent_message_count = Count - 1},
+ State))
end.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
@@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State) ->
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.