summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-23 15:10:02 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-23 15:10:02 +0100
commit80975016a35d4752651cc6922d1360fa3e71c430 (patch)
treeb43b67cc999221d30e09cdff07bf2950b39b3315
parentdacc8d59ba2c13ad236908ac0288709c425a0f82 (diff)
downloadrabbitmq-server-80975016a35d4752651cc6922d1360fa3e71c430.tar.gz
unprioritized calls, casts and infos are passed through the prioritize functions
PCalls and PCasts have the priorities assigned to them by the pcall, pcast call. Everything else gets its priority set by prioritize_call/3, prioritize_cast/2 or prioritize_info/2.
-rw-r--r--src/gen_server2.erl77
1 files changed, 48 insertions, 29 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 2dde103b..a7600150 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -181,8 +181,8 @@
%% State record
-record(gs2_state, {parent, name, state, mod, time,
- timeout_state, queue, debug, prioritise_call,
- prioritise_cast, prioritise_info}).
+ timeout_state, queue, debug, prioritize_call,
+ prioritize_cast, prioritize_info}).
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
@@ -410,21 +410,12 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- PrioriCall = function_exported_or_default(
- Mod, 'prioritize_call', 3,
- fun (_Msg, _From, _State) -> 0 end),
- PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2,
- fun (_Msg, _State) -> 0 end),
- PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2,
- fun (_Msg, _State) -> 0 end),
- GS2State = #gs2_state { parent = Parent,
- name = Name,
- mod = Mod,
- queue = Queue,
- debug = Debug,
- prioritise_call = PrioriCall,
- prioritise_cast = PrioriCast,
- prioritise_info = PrioriInfo },
+ GS2State = find_prioritizers(
+ #gs2_state { parent = Parent,
+ name = Name,
+ mod = Mod,
+ queue = Queue,
+ debug = Debug }),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -497,11 +488,11 @@ loop(GS2State = #gs2_state { time = hibernate,
timeout_state = undefined }) ->
pre_hibernate(GS2State);
loop(GS2State = #gs2_state { queue = Queue }) ->
- process_next_msg(GS2State #gs2_state { queue = drain(Queue) }).
+ process_next_msg(GS2State #gs2_state { queue = drain(Queue, GS2State) }).
-drain(Queue) ->
+drain(Queue, GS2State) ->
receive
- Input -> drain(in(Input, Queue))
+ Input -> drain(in(Input, Queue, GS2State), GS2State)
after 0 -> Queue
end.
@@ -531,7 +522,8 @@ process_next_msg(GS2State = #gs2_state { time = Time,
Input ->
%% Time could be 'hibernate' here, so *don't* call loop
process_next_msg(
- GS2State #gs2_state { queue = drain(in(Input, Queue1)) })
+ GS2State #gs2_state {
+ queue = drain(in(Input, Queue1, GS2State), GS2State) })
after Time1 ->
case HibOnTimeout of
true ->
@@ -553,7 +545,7 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS,
adjust_timeout_state(SleptAt, now(), TimeoutState)
end,
post_hibernate(GS2State #gs2_state { timeout_state = TimeoutState1,
- queue = drain(Queue) }).
+ queue = drain(Queue, GS2State) }).
hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
@@ -619,13 +611,19 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-%% THE MAGIC HAPPENS HERE
-in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+in({'$gen_pcast', {Priority, Msg}}, Queue, _GS2State) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
-in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+in({'$gen_pcall', From, {Priority, Msg}}, Queue, _GS2State) ->
priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
-in(Input, Queue) ->
- priority_queue:in(Input, Queue).
+in({'$gen_cast', Msg}, Queue, #gs2_state { prioritize_cast = PC,
+ state = State }) ->
+ priority_queue:in({'$gen_cast', Msg}, PC(Msg, State), Queue);
+in({'$gan_call', From, Msg}, Queue, #gs2_state { prioritize_call = PC,
+ state = State }) ->
+ priority_queue:in({'$gen_call', Msg}, PC(Msg, From, State), Queue);
+in(Input, Queue, #gs2_state { prioritize_info = PI,
+ state = State }) ->
+ priority_queue:in(Input, PI(Input, State), Queue).
process_msg(Msg,
GS2State = #gs2_state { parent = Parent,
@@ -993,7 +991,9 @@ system_code_change(GS2State = #gs2_state { mod = Mod,
_Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
{ok, NewState} ->
- {ok, [GS2State #gs2_state { state = NewState }]};
+ NewGS2State = find_prioritizers(
+ GS2State #gs2_state { state = NewState }),
+ {ok, [NewGS2State]};
Else ->
Else
end.
@@ -1159,9 +1159,28 @@ name_to_pid(Name) ->
Pid
end.
+find_prioritizers(GS2State = #gs2_state { mod = Mod }) ->
+ PrioriCall = function_exported_or_default(
+ Mod, 'prioritize_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PrioriCast = function_exported_or_default(Mod, 'prioritize_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PrioriInfo = function_exported_or_default(Mod, 'prioritize_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritize_call = PrioriCall,
+ prioritize_cast = PrioriCast,
+ prioritize_info = PrioriInfo }.
+
function_exported_or_default(Mod, Fun, Ar, Default) ->
case erlang:function_exported(Mod, Fun, Ar) of
- true -> fun (Args) -> apply(Mod, Fun, Args) end;
+ true -> case Ar of
+ 2 -> fun (Msg, State) ->
+ Mod:Fun(Msg, State)
+ end;
+ 3 -> fun (Msg, From, State) ->
+ Mod:Fun(Msg, From, State)
+ end
+ end;
false -> Default
end.