diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-23 13:27:31 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-23 13:27:31 +0100 |
commit | cc4012012d860425781ea1e8da3f8ec6ce8c9c39 (patch) | |
tree | 1e63b1cc1f0ac60f8ae469fb324b14f95c4d4eb6 | |
parent | 1270b265d77799c97af7ec6f16a7637ed4caefda (diff) | |
download | rabbitmq-server-cc4012012d860425781ea1e8da3f8ec6ce8c9c39.tar.gz |
master must broadcast, esp if it doesn't know about the sender. Also rip out the varying priority run_backing_queue* stuff as it turns out it's not needed
-rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 28 |
4 files changed, 40 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0550f13b..8c374ef3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,7 +33,6 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, run_backing_queue_async/3, - run_backing_queue/4, run_backing_queue_async/4, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -150,14 +149,6 @@ -spec(run_backing_queue_async/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue/4 :: - (pid(), atom(), - (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})), - integer() | 'default') -> 'ok'). --spec(run_backing_queue_async/4 :: - (pid(), atom(), - (fun ((atom(), A) -> {[rabbit_types:msg_id()], A})), - integer() | 'default') -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -457,16 +448,10 @@ internal_delete(QueueName) -> end). run_backing_queue(QPid, Mod, Fun) -> - run_backing_queue(QPid, Mod, Fun, default). + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). run_backing_queue_async(QPid, Mod, Fun) -> - run_backing_queue_async(QPid, Mod, Fun, default). - -run_backing_queue(QPid, Mod, Fun, Priority) -> - gen_server2:call(QPid, {run_backing_queue, Mod, Fun, Priority}, infinity). - -run_backing_queue_async(QPid, Mod, Fun, Priority) -> - gen_server2:cast(QPid, {run_backing_queue, Mod, Fun, Priority}). + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7daf869b..ea31ec13 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -843,31 +843,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {run_backing_queue, _Mod, _Fun, default} -> 6; - {run_backing_queue, _Mod, _Fun, Priority} -> Priority; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {run_backing_queue, _Mod, _Fun, default} -> 6; - {run_backing_queue, _Mod, _Fun, Priority} -> Priority; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -1081,11 +1079,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) -> +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) -> +handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0e7f32f0..78c771cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -62,22 +62,14 @@ stop() -> sender_death_fun() -> Self = self(), fun (DeadPid) -> - %% Purposefully set the priority to 0 here so that we - %% don't overtake any messages from DeadPid that are - %% already in the queue. rabbit_amqqueue:run_backing_queue_async( Self, ?MODULE, fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> rabbit_log:info("Master saw death of sender ~p~n", [DeadPid]), - case sets:is_element(DeadPid, KS) of - false -> - State; - true -> - ok = gm:broadcast(GM, {sender_death, DeadPid}), - KS1 = sets:del_element(DeadPid, KS), - State #state { known_senders = KS1 } - end - end, 0) + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end) end. init(#amqqueue { arguments = Args, name = QName } = Q, Recover, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f065f667..265657de 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -170,14 +170,14 @@ handle_call({gm_deaths, Deaths}, From, {stop, normal, State} end; -handle_call({run_backing_queue, Mod, Fun, _Priority}, _From, State) -> +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> reply(ok, run_backing_queue(Mod, Fun, State)); handle_call({commit, _Txn, _ChPid}, _From, State) -> %% We don't support transactions in mirror queues reply(ok, State). -handle_cast({run_backing_queue, Mod, Fun, _Priority}, State) -> +handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast({gm, Instruction}, State) -> @@ -265,23 +265,21 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of - {run_backing_queue, _Mod, _Fun, default} -> 6; - {run_backing_queue, _Mod, _Fun, Priority} -> Priority; - {gm_deaths, _Deaths} -> 5; - _ -> 0 + {run_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - {run_backing_queue, _Mod, _Fun, default} -> 6; - {run_backing_queue, _Mod, _Fun, Priority} -> Priority; - sync_timeout -> 6; - {gm, _Msg} -> 5; - {post_commit, _Txn, _AckTags} -> 4; - _ -> 0 + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 end. %% --------------------------------------------------------------------------- |