diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 11bb66d7..ba8becfc 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,6 +16,11 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -107,8 +112,8 @@
%% API
-export([start/3, start/4,
start_link/3, start_link/4,
- call/2, call/3,
- cast/2, reply/2,
+ call/2, call/3, pcall/3, pcall/4,
+ cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
enter_loop/3, enter_loop/4, enter_loop/5]).
@@ -188,6 +193,22 @@ call(Name, Request, Timeout) ->
exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+pcall(Name, Priority, Request) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+ end.
+pcall(Name, Priority, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ end.
%% -----------------------------------------------------------------
%% Make a cast to a generic server.
%% -----------------------------------------------------------------
@@ -207,6 +228,22 @@ do_cast(Dest, Request) ->
cast_msg(Request) -> {'$gen_cast',Request}.
+pcast({global,Name}, Priority, Request) ->
+ catch global:send(Name, cast_msg(Priority, Request)),
+ ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+ do_cast(Dest, Priority, Request).
+do_cast(Dest, Priority, Request) ->
+ do_send(Dest, cast_msg(Priority, Request)),
+ ok.
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
%% -----------------------------------------------------------------
%% Send a reply to the client.
%% -----------------------------------------------------------------
@@ -276,7 +313,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
@@ -294,7 +331,7 @@ init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) ->
Debug = debug_options(Name, Options),
- Queue = queue:new(),
+ Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -326,9 +363,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
Input -> loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue), Debug)
+ Time, in(Input, Queue), Debug)
after 0 ->
- case queue:out(Queue) of
+ case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, Msg);
@@ -336,14 +373,21 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
Input ->
loop(Parent, Name, State, Mod,
- Time, queue:in(Input, Queue1), Debug)
+ Time, in(Input, Queue1), Debug)
after Time ->
process_msg(Parent, Name, State, Mod,
Time, Queue1, Debug, timeout)
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+ priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
case Msg of
{system, From, Req} ->
@@ -850,5 +894,5 @@ format_status(Opt, StatusData) ->
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", queue:to_list(Queue)}]} |
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
new file mode 100644
index 00000000..b872c8d5
--- /dev/null
+++ b/src/priority_queue.erl
@@ -0,0 +1,151 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%% The Original Code is RabbitMQ.
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% All Rights Reserved.
+%% Contributor(s): ______________________________________.
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%% in/2 inserts items with priority 0.
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%% When the queue contains items with non-zero priorities, it is
+%% represented as a sorted kv list with the inverted Priority as the
+%% key and an ordinary queue as the value. Here again we use our own
+%% ordinary queue implemention for efficiency, often making recursive
+%% calls into the same function knowing that ordinary queues represent
+%% a base case.
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-type(priority() :: integer()).
+-type(squeue() :: {queue, [any()], [any()]}).
+-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
+-spec(new/0 :: () -> pqueue()).
+-spec(is_queue/1 :: (any()) -> bool()).
+-spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(in/2 :: (any(), pqueue()) -> pqueue()).
+-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
+-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+new() ->
+ {queue, [], []}.
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Queues}) when is_list(Queues) ->
+ lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
+ Queues);
+is_queue(_) ->
+ false.
+is_empty({queue, [], []}) ->
+ true;
+is_empty(_) ->
+ false.
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, Queues}) ->
+ lists:sum([len(Q) || {_, Q} <- Queues]).
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ [{0, V} || V <- Out ++ lists:reverse(In, [])];
+to_list({pqueue, Queues}) ->
+ [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+in(Item, Q) ->
+ in(Item, 0, Q).
+in(X, 0, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(X, Priority, Q = {queue, _, _}) ->
+ in(X, Priority, {pqueue, [{0, Q}]});
+in(X, Priority, {pqueue, Queues}) ->
+ P = -Priority,
+ {pqueue, case lists:keysearch(P, 1, Queues) of
+ {value, {_, Q}} ->
+ lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end}.
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, [{P, Q} | Queues]}) ->
+ {R, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {R, NewQ}.
+r2f([]) -> {queue, [], []};
+r2f([_] = R) -> {queue, [], R};
+r2f([X,Y]) -> {queue, [X], [Y]};
+r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fdc99c38..06bb18f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -213,10 +213,10 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, info, infinity).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:call(QPid, {info, Items}, infinity) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6312e8e3..946b8c31 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = test_priority_queue(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -55,6 +56,58 @@ all_tests() ->
passed = test_server_status(),
+test_priority_queue() ->
+ false = priority_queue:is_queue(not_a_queue),
+ %% empty Q
+ Q = priority_queue:new(),
+ {true, true, 0, [], []} = test_priority_queue(Q),
+ %% 1-4 element no-priority Q
+ true = lists:all(fun (X) -> X =:= passed end,
+ lists:map(fun test_simple_n_element_queue/1,
+ lists:seq(1, 4))),
+ %% 1-element priority Q
+ Q1 = priority_queue:in(foo, 1, priority_queue:new()),
+ {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ %% 2-element same-priority Q
+ Q2 = priority_queue:in(bar, 1, Q1),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q2),
+ %% 2-element different-priority Q
+ Q3 = priority_queue:in(bar, 2, Q1),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q3),
+ passed.
+priority_queue_in_all(Q, L) ->
+ lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L).
+priority_queue_out_all(Q) ->
+ case priority_queue:out(Q) of
+ {empty, _} -> [];
+ {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
+ end.
+test_priority_queue(Q) ->
+ {priority_queue:is_queue(Q),
+ priority_queue:is_empty(Q),
+ priority_queue:len(Q),
+ priority_queue:to_list(Q),
+ priority_queue_out_all(Q)}.
+test_simple_n_element_queue(N) ->
+ Items = lists:seq(1, N),
+ Q = priority_queue_in_all(priority_queue:new(), Items),
+ ToListRes = [{0, X} || X <- Items],
+ {true, false, N, ToListRes, Items} = test_priority_queue(Q),
+ passed.
test_parsing() ->
passed = test_content_properties(),