summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-03 16:12:05 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-03 16:12:05 +0100
commit2e0e201e7336fc9381b8e3a29579d5399fdfa42e (patch)
treebaf1defe25ece92d34e3ae4c74bb6af12a0cb533
parent5c0b1272675c8a78b714d2cf17e93a3f34db65d9 (diff)
downloadrabbitmq-server-2e0e201e7336fc9381b8e3a29579d5399fdfa42e.tar.gz
Dead lettering by policy.
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl40
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_policies.erl17
4 files changed, 49 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 31c0268f..e97759cf 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -24,7 +24,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0, wake_up/1]).
+-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/10, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
@@ -111,7 +111,7 @@
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
--spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -298,7 +298,7 @@ policy_changed(Q1, Q2) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
- wake_up(Q1).
+ notify_policy_changed(Q1).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -500,7 +500,8 @@ force_event_refresh(QNames) ->
force_event_refresh(Failed)
end.
-wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
+notify_policy_changed(#amqqueue{pid = QPid}) ->
+ gen_server2:cast(QPid, policy_changed).
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 972e6be0..8c6387a3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -136,7 +136,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
senders = Senders,
msg_id_to_channel = MTC},
- State2 = process_args(State1),
+ State2 = process_args_policy(State1),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries).
@@ -196,8 +196,9 @@ declare(Recover, From, State = #q{q = Q,
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
recovery_barrier(Recover),
- State1 = process_args(State#q{backing_queue = BQ,
- backing_queue_state = BQS}),
+ State1 = process_args_policy(
+ State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -238,14 +239,23 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+process_args_policy(State0 = #q{q = Q = #amqqueue{arguments = Arguments}}) ->
+ State1 = lists:foldl(
+ fun({Arg, Fun}, StateN) ->
+ case rabbit_policy:get(Arg, Q) of
+ undefined -> StateN;
+ Val -> Fun(Val, StateN)
+ end
+ end, State0,
+ [{<<"dead-letter-exchange">>, fun init_dlx/2},
+ {<<"dead-letter-routing-key">>, fun init_dlx_routing_key/2}]),
lists:foldl(
- fun({Arg, Fun}, State1) ->
+ fun({Arg, Fun}, StateN) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
- {_Type, Val} -> Fun(Val, State1);
- undefined -> State1
+ {_Type, Val} -> Fun(Val, StateN);
+ undefined -> StateN
end
- end, State,
+ end, State1,
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
@@ -959,8 +969,7 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
-i(policy, #q{q = #amqqueue{name = Name}}) ->
- {ok, Q} = rabbit_amqqueue:lookup(Name),
+i(policy, #q{q = Q}) ->
case rabbit_policy:name(Q) of
none -> '';
Policy -> Policy
@@ -1388,8 +1397,15 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
-handle_cast(wake_up, State) ->
- noreply(State).
+handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
+ %% We depend on the #q.q field being up to date at least WRT
+ %% policy (but not slave pids) in various places, so when it
+ %% changes we go and read it from Mnesia again.
+ %%
+ %% This also has the side effect of waking us up so we emit a
+ %% stats event - so event consumers see the changed policy.
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ noreply(process_args_policy(State#q{q = Q})).
handle_info(maybe_expire, State) ->
case is_unused(State) of
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index eded0411..2d55cad4 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -221,7 +221,7 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
- rabbit_amqqueue:wake_up(Q1),
+ rabbit_amqqueue:notify_policy_changed(Q1),
Q1.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 20f461bc..a77e1e7b 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -29,12 +29,25 @@
register() ->
[rabbit_registry:register(Class, Name, ?MODULE) ||
- {Class, Name} <- [{policy_validator, <<"alternate-exchange">>}]],
+ {Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
+ {policy_validator, <<"dead-letter-exchange">>},
+ {policy_validator, <<"dead-letter-routing-key">>}]],
ok.
validate_policy([{<<"alternate-exchange">>, Value}])
when is_binary(Value) ->
ok;
validate_policy([{<<"alternate-exchange">>, Value}]) ->
- {error, "~p is not a valid alternate exchange name", [Value]}.
+ {error, "~p is not a valid alternate exchange name", [Value]};
+validate_policy([{<<"dead-letter-exchange">>, Value}])
+ when is_binary(Value) ->
+ ok;
+validate_policy([{<<"dead-letter-exchange">>, Value}]) ->
+ {error, "~p is not a valid dead letter exchange name", [Value]};
+
+validate_policy([{<<"dead-letter-routing-key">>, Value}])
+ when is_binary(Value) ->
+ ok;
+validate_policy([{<<"dead-letter-routing-key">>, Value}]) ->
+ {error, "~p is not a valid dead letter routing key", [Value]}.