diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 43fe3578..92b00db0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -195,7 +195,7 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover) of + case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> @@ -206,6 +206,7 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = bq_init(BQ, Q, Recover), + recovery_barrier(Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -219,25 +220,34 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. -matches(true, Q, Q) -> true; -matches(true, _Q, _Q1) -> false; -matches(false, Q1, Q2) -> +matches(new, Q1, Q2) -> %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; +matches(_, Q, Q) -> true; +matches(_, _Q, _Q1) -> false. bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover, + BQ:init(Q, Recover =/= new, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). +recovery_barrier(new) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl( fun({Arg, Fun}, State1) -> @@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -1001,9 +1011,9 @@ handle_call({init, Recover}, From, q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) + new -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]); + _ -> ok end, BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. |