summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl42
1 files changed, 26 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 43fe3578..92b00db0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -195,7 +195,7 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
+ case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
#amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
@@ -206,6 +206,7 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = bq_init(BQ, Q, Recover),
+ recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -219,25 +220,34 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
-matches(true, Q, Q) -> true;
-matches(true, _Q, _Q1) -> false;
-matches(false, Q1, Q2) ->
+matches(new, Q1, Q2) ->
%% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+matches(_, Q, Q) -> true;
+matches(_, _Q, _Q1) -> false.
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover,
+ BQ:init(Q, Recover =/= new,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
+recovery_barrier(new) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(
fun({Arg, Fun}, State1) ->
@@ -247,9 +257,9 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State,
[{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]).
+ {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -1001,9 +1011,9 @@ handle_call({init, Recover}, From,
q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
+ new -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName]);
+ _ -> ok
end,
BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.