summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-05 14:47:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-05 14:47:42 +0100
commitbe90fe7b9e23e1b06b7e84301aa382fcc73c2283 (patch)
treed23e2ccfa34edb33ee0bc95ff78c148f4eda1441
parente0af641ee62b45ba88353a77a7026f365a9aa234 (diff)
downloadrabbitmq-server-be90fe7b9e23e1b06b7e84301aa382fcc73c2283.tar.gz
Start of queue decorators.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_queue_decorator.erl34
-rw-r--r--src/rabbit_registry.erl1
-rw-r--r--src/rabbit_upgrade_functions.erl16
7 files changed, 73 insertions, 11 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 4282755d..9fdce2b0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -45,7 +45,7 @@
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids}).
+ gm_pids, decorators}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7004a353..5331a584 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -79,7 +79,8 @@
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
- fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue()))
+ -> 'not_found' | rabbit_types:amqqueue()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -279,9 +280,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
_ -> ok
- end;
+ end,
+ Q1;
[] ->
- ok
+ not_found
end.
store_queue(Q = #amqqueue{durable = true}) ->
@@ -295,9 +297,12 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
+policy_changed(Q1 = #amqqueue{decorators = Decorators1},
+ Q2 = #amqqueue{decorators = Decorators2}) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
- rabbit_federation_queue:policy_changed(Q1, Q2),
+ D1 = rabbit_queue_decorator:select(Decorators1),
+ D2 = rabbit_queue_decorator:select(Decorators2),
+ [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
wake_up(Q1).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c4face1f..893659d4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1139,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
- State = #q{q = Q = #amqqueue{name = QName}}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0990c662..28bfb9c2 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -45,7 +45,8 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
+ Q#amqqueue{policy = set0(Name)});
set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
X#exchange{policy = set0(Name)}).
@@ -184,9 +185,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
case match(QName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_amqqueue:update(
- QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
- {Q, Q#amqqueue{policy = NewPolicy}}
+ NewPolicy -> case rabbit_amqqueue:update(
+ QName, fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy})
+ end) of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
new file mode 100644
index 00000000..9ea13beb
--- /dev/null
+++ b/src/rabbit_queue_decorator.erl
@@ -0,0 +1,34 @@
+-module(rabbit_queue_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/1, set/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+
+-callback active_for(rabbit_types:amqqueue()) -> boolean().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {active_for, 1}, {policy_changed, 2}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+select(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 6aae8de6..5002a03d 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
class_module(policy_validator) -> rabbit_policy_validator;
class_module(ha_mode) -> rabbit_mirror_queue_mode.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b7b1635b..9e8c1232 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -44,6 +44,7 @@
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
+-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
%% -------------------------------------------------------------------
@@ -70,6 +71,7 @@
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
+-spec(queue_decorators/0 :: () -> 'ok').
-endif.
@@ -299,6 +301,20 @@ exchange_decorators(Table) ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+queue_decorators() ->
+ ok = queue_decorators(rabbit_queue),
+ ok = queue_decorators(rabbit_durable_queue).
+
+queue_decorators(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, policy, gm_pids, decorators]).
%%--------------------------------------------------------------------