diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-07 13:39:36 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-07 13:39:36 +0000 |
commit | 50e03766a0f34747f6ea7c28ebc3a2760b66cdeb (patch) | |
tree | fb84d369f50f2819f57d506614a113c65b6be30d | |
parent | b794768e9815bbc3c9c2bfcc04c8c03e9dd29013 (diff) | |
download | rabbitmq-server-50e03766a0f34747f6ea7c28ebc3a2760b66cdeb.tar.gz |
reduce impact of long message queues on selective receives
-rw-r--r-- | src/gen_server2.erl | 120 |
1 files changed, 78 insertions, 42 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index be0ed136..ef14813f 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -3,6 +3,16 @@ %% %% 1) the module name is gen_server2 %% +%% +%% 2) more efficient handling of selective receives in callbacks +%% gen_server2 processes drain their message queue into an internal +%% buffer before invoking any callback module functions. Messages are +%% dequeued from the buffer for processing. Thus the effective message +%% queue of a gen_server2 process is the concatenation of the internal +%% buffer and the real message queue. +%% As a result of the draining, any selective receive invoked inside a +%% callback is less likely to have to scan a large message queue. +%% %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -263,7 +273,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), - loop(Parent, Name, State, Mod, Timeout, Debug). + Queue = queue:new(), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -280,13 +291,14 @@ init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, Parent, Name, Mod, Args, Options) -> Debug = debug_options(Name, Options), + Queue = queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Debug); + loop(Parent, Name, State, Mod, infinity, Queue, Debug); {ok, State, Timeout} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Debug); + loop(Parent, Name, State, Mod, Timeout, Queue, Debug); {stop, Reason} -> proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); @@ -308,25 +320,40 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Debug) -> - Msg = receive - Input -> - Input - after Time -> - timeout - end, +loop(Parent, Name, State, Mod, Time, Queue, Debug) -> + receive + Input -> loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue), Debug) + after 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue1), Debug) + after Time -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, timeout) + end + end + end. + +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time]); + [Name, State, Mod, Time, Queue]); {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time); + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) end. %%% --------------------------------------------------- @@ -528,63 +555,70 @@ dispatch({'$gen_cast', Msg}, Mod, State) -> dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). -handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, _Time) -> +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, []); + loop(Parent, Name, NState, Mod, infinity, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, []); + loop(Parent, Name, NState, Mod, Time1, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, []); + loop(Parent, Name, NState, Mod, infinity, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, []); + loop(Parent, Name, NState, Mod, Time1, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); - Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State) + Other -> handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue) end; -handle_msg(Msg, Parent, Name, State, Mod, _Time) -> +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). -handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, _Time, Debug) -> +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Debug1); + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Debug1); + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Debug1); + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Debug1); + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), reply(Name, From, Reply, NState, Debug), exit(R); Other -> - handle_common_reply(Other, Parent, Name, Msg, Mod, State, Debug) + handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue, Debug) end; -handle_msg(Msg, Parent, Name, State, Mod, _Time, Debug) -> +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Debug). + handle_common_reply(Reply, + Parent, Name, Msg, Mod, State, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, []); + loop(Parent, Name, NState, Mod, infinity, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, []); + loop(Parent, Name, NState, Mod, Time1, Queue, []); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, []); {'EXIT', What} -> @@ -593,16 +627,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State) -> terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Debug1); + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Debug1); + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -620,15 +654,15 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time]) -> - loop(Parent, Name, State, Mod, Time, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> + loop(Parent, Name, State, Mod, Time, Queue, Debug). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time]) -> +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time]}; + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; Else -> Else end. @@ -795,7 +829,8 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time]] = StatusData, + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> @@ -816,5 +851,6 @@ format_status(Opt, StatusData) -> [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, - {"Logged events", Log}]} | + {"Logged events", Log}, + {"Queued messages", queue:to_list(Queue)}]} | Specfic]. |