diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 14:47:42 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 14:47:42 +0100 |
commit | be90fe7b9e23e1b06b7e84301aa382fcc73c2283 (patch) | |
tree | d23e2ccfa34edb33ee0bc95ff78c148f4eda1441 | |
parent | e0af641ee62b45ba88353a77a7026f365a9aa234 (diff) | |
download | rabbitmq-server-be90fe7b9e23e1b06b7e84301aa382fcc73c2283.tar.gz |
Start of queue decorators.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 14 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 34 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 1 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 16 |
7 files changed, 73 insertions, 11 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4282755d..9fdce2b0 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -45,7 +45,7 @@ -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid, slave_pids, sync_slave_pids, policy, - gm_pids}). + gm_pids, decorators}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7004a353..5331a584 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -79,7 +79,8 @@ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), - fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) + -> 'not_found' | rabbit_types:amqqueue()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -279,9 +280,10 @@ update(Name, Fun) -> case Durable of true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); _ -> ok - end; + end, + Q1; [] -> - ok + not_found end. store_queue(Q = #amqqueue{durable = true}) -> @@ -295,9 +297,12 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(Q1, Q2) -> +policy_changed(Q1 = #amqqueue{decorators = Decorators1}, + Q2 = #amqqueue{decorators = Decorators2}) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), - rabbit_federation_queue:policy_changed(Q1, Q2), + D1 = rabbit_queue_decorator:select(Decorators1), + D2 = rabbit_queue_decorator:select(Decorators2), + [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. wake_up(Q1). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c4face1f..893659d4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1139,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, - State = #q{q = Q = #amqqueue{name = QName}}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 0990c662..28bfb9c2 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -45,7 +45,8 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( + Q#amqqueue{policy = set0(Name)}); set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( X#exchange{policy = set0(Name)}). @@ -184,9 +185,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> case match(QName, Policies) of OldPolicy -> no_change; - NewPolicy -> rabbit_amqqueue:update( - QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), - {Q, Q#amqqueue{policy = NewPolicy}} + NewPolicy -> case rabbit_amqqueue:update( + QName, fun(Q1) -> + rabbit_queue_decorator:set( + Q1#amqqueue{policy = NewPolicy}) + end) of + #amqqueue{} = Q1 -> {Q, Q1}; + not_found -> {Q, Q } + end end. notify(no_change)-> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl new file mode 100644 index 00000000..9ea13beb --- /dev/null +++ b/src/rabbit_queue_decorator.erl @@ -0,0 +1,34 @@ +-module(rabbit_queue_decorator). + +-include("rabbit.hrl"). + +-export([select/1, set/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> + 'ok'. + +-callback active_for(rabbit_types:amqqueue()) -> boolean(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {active_for, 1}, {policy_changed, 2}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +select(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 6aae8de6..5002a03d 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; class_module(policy_validator) -> rabbit_policy_validator; class_module(ha_mode) -> rabbit_mirror_queue_mode. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b7b1635b..9e8c1232 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -44,6 +44,7 @@ -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). +-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). %% ------------------------------------------------------------------- @@ -70,6 +71,7 @@ -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). -spec(exchange_decorators/0 :: () -> 'ok'). +-spec(queue_decorators/0 :: () -> 'ok'). -endif. @@ -299,6 +301,20 @@ exchange_decorators(Table) -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +queue_decorators() -> + ok = queue_decorators(rabbit_queue), + ok = queue_decorators(rabbit_durable_queue). + +queue_decorators(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, []} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, policy, gm_pids, decorators]). %%-------------------------------------------------------------------- |