diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-23 15:10:02 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-23 15:10:02 +0100 |
commit | 80975016a35d4752651cc6922d1360fa3e71c430 (patch) | |
tree | b43b67cc999221d30e09cdff07bf2950b39b3315 | |
parent | dacc8d59ba2c13ad236908ac0288709c425a0f82 (diff) | |
download | rabbitmq-server-80975016a35d4752651cc6922d1360fa3e71c430.tar.gz |
unprioritized calls, casts and infos are passed through the prioritize functions
PCalls and PCasts have the priorities assigned to them by the pcall, pcast call.
Everything else gets its priority set by prioritize_call/3,
prioritize_cast/2 or prioritize_info/2.
-rw-r--r-- | src/gen_server2.erl | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 2dde103b..a7600150 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -181,8 +181,8 @@ %% State record -record(gs2_state, {parent, name, state, mod, time, - timeout_state, queue, debug, prioritise_call, - prioritise_cast, prioritise_info}). + timeout_state, queue, debug, prioritize_call, + prioritize_cast, prioritize_info}). %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings @@ -410,21 +410,12 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - PrioriCall = function_exported_or_default( - Mod, 'prioritize_call', 3, - fun (_Msg, _From, _State) -> 0 end), - PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2, - fun (_Msg, _State) -> 0 end), - PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2, - fun (_Msg, _State) -> 0 end), - GS2State = #gs2_state { parent = Parent, - name = Name, - mod = Mod, - queue = Queue, - debug = Debug, - prioritise_call = PrioriCall, - prioritise_cast = PrioriCast, - prioritise_info = PrioriInfo }, + GS2State = find_prioritizers( + #gs2_state { parent = Parent, + name = Name, + mod = Mod, + queue = Queue, + debug = Debug }), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -497,11 +488,11 @@ loop(GS2State = #gs2_state { time = hibernate, timeout_state = undefined }) -> pre_hibernate(GS2State); loop(GS2State = #gs2_state { queue = Queue }) -> - process_next_msg(GS2State #gs2_state { queue = drain(Queue) }). + process_next_msg(GS2State #gs2_state { queue = drain(Queue, GS2State) }). -drain(Queue) -> +drain(Queue, GS2State) -> receive - Input -> drain(in(Input, Queue)) + Input -> drain(in(Input, Queue, GS2State), GS2State) after 0 -> Queue end. @@ -531,7 +522,8 @@ process_next_msg(GS2State = #gs2_state { time = Time, Input -> %% Time could be 'hibernate' here, so *don't* call loop process_next_msg( - GS2State #gs2_state { queue = drain(in(Input, Queue1)) }) + GS2State #gs2_state { + queue = drain(in(Input, Queue1, GS2State), GS2State) }) after Time1 -> case HibOnTimeout of true -> @@ -553,7 +545,7 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS, adjust_timeout_state(SleptAt, now(), TimeoutState) end, post_hibernate(GS2State #gs2_state { timeout_state = TimeoutState1, - queue = drain(Queue) }). + queue = drain(Queue, GS2State) }). hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of @@ -619,13 +611,19 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -%% THE MAGIC HAPPENS HERE -in({'$gen_pcast', {Priority, Msg}}, Queue) -> +in({'$gen_pcast', {Priority, Msg}}, Queue, _GS2State) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); -in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> +in({'$gen_pcall', From, {Priority, Msg}}, Queue, _GS2State) -> priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); -in(Input, Queue) -> - priority_queue:in(Input, Queue). +in({'$gen_cast', Msg}, Queue, #gs2_state { prioritize_cast = PC, + state = State }) -> + priority_queue:in({'$gen_cast', Msg}, PC(Msg, State), Queue); +in({'$gan_call', From, Msg}, Queue, #gs2_state { prioritize_call = PC, + state = State }) -> + priority_queue:in({'$gen_call', Msg}, PC(Msg, From, State), Queue); +in(Input, Queue, #gs2_state { prioritize_info = PI, + state = State }) -> + priority_queue:in(Input, PI(Input, State), Queue). process_msg(Msg, GS2State = #gs2_state { parent = Parent, @@ -993,7 +991,9 @@ system_code_change(GS2State = #gs2_state { mod = Mod, _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of {ok, NewState} -> - {ok, [GS2State #gs2_state { state = NewState }]}; + NewGS2State = find_prioritizers( + GS2State #gs2_state { state = NewState }), + {ok, [NewGS2State]}; Else -> Else end. @@ -1159,9 +1159,28 @@ name_to_pid(Name) -> Pid end. +find_prioritizers(GS2State = #gs2_state { mod = Mod }) -> + PrioriCall = function_exported_or_default( + Mod, 'prioritize_call', 3, + fun (_Msg, _From, _State) -> 0 end), + PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2, + fun (_Msg, _State) -> 0 end), + PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2, + fun (_Msg, _State) -> 0 end), + GS2State #gs2_state { prioritize_call = PrioriCall, + prioritize_cast = PrioriCast, + prioritize_info = PrioriInfo }. + function_exported_or_default(Mod, Fun, Ar, Default) -> case erlang:function_exported(Mod, Fun, Ar) of - true -> fun (Args) -> apply(Mod, Fun, Args) end; + true -> case Ar of + 2 -> fun (Msg, State) -> + Mod:Fun(Msg, State) + end; + 3 -> fun (Msg, From, State) -> + Mod:Fun(Msg, From, State) + end + end; false -> Default end. |