summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-01-07 13:39:36 +0000
committerMatthias Radestock <matthias@lshift.net>2009-01-07 13:39:36 +0000
commit50e03766a0f34747f6ea7c28ebc3a2760b66cdeb (patch)
treefb84d369f50f2819f57d506614a113c65b6be30d
parentb794768e9815bbc3c9c2bfcc04c8c03e9dd29013 (diff)
downloadrabbitmq-server-50e03766a0f34747f6ea7c28ebc3a2760b66cdeb.tar.gz
reduce impact of long message queues on selective receives
-rw-r--r--src/gen_server2.erl120
1 files changed, 78 insertions, 42 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index be0ed136..ef14813f 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -3,6 +3,16 @@
%%
%% 1) the module name is gen_server2
%%
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -263,7 +273,8 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
- loop(Parent, Name, State, Mod, Timeout, Debug).
+ Queue = queue:new(),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -280,13 +291,14 @@ init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) ->
Debug = debug_options(Name, Options),
+ Queue = queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, Debug);
+ loop(Parent, Name, State, Mod, infinity, Queue, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Debug);
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
@@ -308,25 +320,40 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, Time, Debug) ->
- Msg = receive
- Input ->
- Input
- after Time ->
- timeout
- end,
+loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+ receive
+ Input -> loop(Parent, Name, State, Mod,
+ Time, queue:in(Input, Queue), Debug)
+ after 0 ->
+ case queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ receive
+ Input ->
+ loop(Parent, Name, State, Mod,
+ Time, queue:in(Input, Queue1), Debug)
+ after Time ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, timeout)
+ end
+ end
+ end.
+
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time]);
+ [Name, State, Mod, Time, Queue]);
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Time);
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Time, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
end.
%%% ---------------------------------------------------
@@ -528,63 +555,70 @@ dispatch({'$gen_cast', Msg}, Mod, State) ->
dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
-handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, _Time) ->
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, []);
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
{reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, []);
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, []);
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, []);
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
- Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State)
+ Other -> handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue)
end;
-handle_msg(Msg, Parent, Name, State, Mod, _Time) ->
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
-handle_msg({'$gen_call', From, Msg}, Parent, Name, State, Mod, _Time, Debug) ->
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
{reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
- handle_common_reply(Other, Parent, Name, Msg, Mod, State, Debug)
+ handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue, Debug)
end;
-handle_msg(Msg, Parent, Name, State, Mod, _Time, Debug) ->
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Debug).
+ handle_common_reply(Reply,
+ Parent, Name, Msg, Mod, State, Queue, Debug).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, []);
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, []);
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', What} ->
@@ -593,16 +627,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State) ->
terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Debug) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
case Reply of
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -620,15 +654,15 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time]) ->
- loop(Parent, Name, State, Mod, Time, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, Queue, Debug).
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time]) ->
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
-system_code_change([Name, State, Mod, Time], _Module, OldVsn, Extra) ->
+system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} -> {ok, [Name, NewState, Mod, Time]};
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
Else -> Else
end.
@@ -795,7 +829,8 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time]] = StatusData,
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
is_atom(Name) ->
@@ -816,5 +851,6 @@ format_status(Opt, StatusData) ->
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
- {"Logged events", Log}]} |
+ {"Logged events", Log},
+ {"Queued messages", queue:to_list(Queue)}]} |
Specfic].