summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-23 13:27:31 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-23 13:27:31 +0100
commitcc4012012d860425781ea1e8da3f8ec6ce8c9c39 (patch)
tree1e63b1cc1f0ac60f8ae469fb324b14f95c4d4eb6
parent1270b265d77799c97af7ec6f16a7637ed4caefda (diff)
downloadrabbitmq-server-cc4012012d860425781ea1e8da3f8ec6ce8c9c39.tar.gz
master must broadcast, esp if it doesn't know about the sender. Also rip out the varying priority run_backing_queue* stuff as it turns out it's not needed
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl44
-rw-r--r--src/rabbit_mirror_queue_master.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl28
4 files changed, 40 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0550f13b..8c374ef3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,7 +33,6 @@
%% internal
-export([internal_declare/2, internal_delete/1,
run_backing_queue/3, run_backing_queue_async/3,
- run_backing_queue/4, run_backing_queue_async/4,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -150,14 +149,6 @@
-spec(run_backing_queue_async/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue/4 ::
- (pid(), atom(),
- (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})),
- integer() | 'default') -> 'ok').
--spec(run_backing_queue_async/4 ::
- (pid(), atom(),
- (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})),
- integer() | 'default') -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -457,16 +448,10 @@ internal_delete(QueueName) ->
end).
run_backing_queue(QPid, Mod, Fun) ->
- run_backing_queue(QPid, Mod, Fun, default).
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
run_backing_queue_async(QPid, Mod, Fun) ->
- run_backing_queue_async(QPid, Mod, Fun, default).
-
-run_backing_queue(QPid, Mod, Fun, Priority) ->
- gen_server2:call(QPid, {run_backing_queue, Mod, Fun, Priority}, infinity).
-
-run_backing_queue_async(QPid, Mod, Fun, Priority) ->
- gen_server2:cast(QPid, {run_backing_queue, Mod, Fun, Priority}).
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7daf869b..ea31ec13 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -843,31 +843,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {run_backing_queue, _Mod, _Fun, default} -> 6;
- {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {run_backing_queue, _Mod, _Fun, default} -> 6;
- {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -1081,11 +1079,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) ->
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) ->
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0e7f32f0..78c771cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -62,22 +62,14 @@ stop() ->
sender_death_fun() ->
Self = self(),
fun (DeadPid) ->
- %% Purposefully set the priority to 0 here so that we
- %% don't overtake any messages from DeadPid that are
- %% already in the queue.
rabbit_amqqueue:run_backing_queue_async(
Self, ?MODULE,
fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
rabbit_log:info("Master saw death of sender ~p~n", [DeadPid]),
- case sets:is_element(DeadPid, KS) of
- false ->
- State;
- true ->
- ok = gm:broadcast(GM, {sender_death, DeadPid}),
- KS1 = sets:del_element(DeadPid, KS),
- State #state { known_senders = KS1 }
- end
- end, 0)
+ ok = gm:broadcast(GM, {sender_death, DeadPid}),
+ KS1 = sets:del_element(DeadPid, KS),
+ State #state { known_senders = KS1 }
+ end)
end.
init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f065f667..265657de 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -170,14 +170,14 @@ handle_call({gm_deaths, Deaths}, From,
{stop, normal, State}
end;
-handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) ->
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
reply(ok, run_backing_queue(Mod, Fun, State));
handle_call({commit, _Txn, _ChPid}, _From, State) ->
%% We don't support transactions in mirror queues
reply(ok, State).
-handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) ->
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
handle_cast({gm, Instruction}, State) ->
@@ -265,23 +265,21 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
- {run_backing_queue, _Mod, _Fun, default} -> 6;
- {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
- {gm_deaths, _Deaths} -> 5;
- _ -> 0
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- {run_backing_queue, _Mod, _Fun, default} -> 6;
- {run_backing_queue, _Mod, _Fun, Priority} -> Priority;
- sync_timeout -> 6;
- {gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
- _ -> 0
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
+ _ -> 0
end.
%% ---------------------------------------------------------------------------