diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-03 16:12:05 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-03 16:12:05 +0100 |
commit | 2e0e201e7336fc9381b8e3a29579d5399fdfa42e (patch) | |
tree | baf1defe25ece92d34e3ae4c74bb6af12a0cb533 | |
parent | 5c0b1272675c8a78b714d2cf17e93a3f34db65d9 (diff) | |
download | rabbitmq-server-2e0e201e7336fc9381b8e3a29579d5399fdfa42e.tar.gz |
Dead lettering by policy.
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_policies.erl | 17 |
4 files changed, 49 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 31c0268f..e97759cf 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,7 +24,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, wake_up/1]). +-export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/10, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). @@ -111,7 +111,7 @@ -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). --spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -298,7 +298,7 @@ policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. - wake_up(Q1). + notify_policy_changed(Q1). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -500,7 +500,8 @@ force_event_refresh(QNames) -> force_event_refresh(Failed) end. -wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). +notify_policy_changed(#amqqueue{pid = QPid}) -> + gen_server2:cast(QPid, policy_changed). consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 972e6be0..8c6387a3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -136,7 +136,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, senders = Senders, msg_id_to_channel = MTC}, - State2 = process_args(State1), + State2 = process_args_policy(State1), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries). @@ -196,8 +196,9 @@ declare(Recover, From, State = #q{q = Q, BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, Recover), recovery_barrier(Recover), - State1 = process_args(State#q{backing_queue = BQ, - backing_queue_state = BQS}), + State1 = process_args_policy( + State#q{backing_queue = BQ, + backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -238,14 +239,23 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> +process_args_policy(State0 = #q{q = Q = #amqqueue{arguments = Arguments}}) -> + State1 = lists:foldl( + fun({Arg, Fun}, StateN) -> + case rabbit_policy:get(Arg, Q) of + undefined -> StateN; + Val -> Fun(Val, StateN) + end + end, State0, + [{<<"dead-letter-exchange">>, fun init_dlx/2}, + {<<"dead-letter-routing-key">>, fun init_dlx_routing_key/2}]), lists:foldl( - fun({Arg, Fun}, State1) -> + fun({Arg, Fun}, StateN) -> case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, State1); - undefined -> State1 + {_Type, Val} -> Fun(Val, StateN); + undefined -> StateN end - end, State, + end, State1, [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, @@ -959,8 +969,7 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> ExclusiveOwner; -i(policy, #q{q = #amqqueue{name = Name}}) -> - {ok, Q} = rabbit_amqqueue:lookup(Name), +i(policy, #q{q = Q}) -> case rabbit_policy:name(Q) of none -> ''; Policy -> Policy @@ -1388,8 +1397,15 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, end end); -handle_cast(wake_up, State) -> - noreply(State). +handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> + %% We depend on the #q.q field being up to date at least WRT + %% policy (but not slave pids) in various places, so when it + %% changes we go and read it from Mnesia again. + %% + %% This also has the side effect of waking us up so we emit a + %% stats event - so event consumers see the changed policy. + {ok, Q} = rabbit_amqqueue:lookup(Name), + noreply(process_args_policy(State#q{q = Q})). handle_info(maybe_expire, State) -> case is_unused(State) of diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index eded0411..2d55cad4 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -221,7 +221,7 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event - rabbit_amqqueue:wake_up(Q1), + rabbit_amqqueue:notify_policy_changed(Q1), Q1. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 20f461bc..a77e1e7b 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -29,12 +29,25 @@ register() -> [rabbit_registry:register(Class, Name, ?MODULE) || - {Class, Name} <- [{policy_validator, <<"alternate-exchange">>}]], + {Class, Name} <- [{policy_validator, <<"alternate-exchange">>}, + {policy_validator, <<"dead-letter-exchange">>}, + {policy_validator, <<"dead-letter-routing-key">>}]], ok. validate_policy([{<<"alternate-exchange">>, Value}]) when is_binary(Value) -> ok; validate_policy([{<<"alternate-exchange">>, Value}]) -> - {error, "~p is not a valid alternate exchange name", [Value]}. + {error, "~p is not a valid alternate exchange name", [Value]}; +validate_policy([{<<"dead-letter-exchange">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"dead-letter-exchange">>, Value}]) -> + {error, "~p is not a valid dead letter exchange name", [Value]}; + +validate_policy([{<<"dead-letter-routing-key">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"dead-letter-routing-key">>, Value}]) -> + {error, "~p is not a valid dead letter routing key", [Value]}. |