summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-08 16:54:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-08 16:54:06 +0100
commitcba6ad4fb962117040d243281dd1c0773eda2b42 (patch)
tree8cfe3ba75fe9f17e5aea5717a198d68af3f986d8 /src/rabbit_binding.erl
parent9579f96dd9507569d4e4f6576673137e3a329193 (diff)
parentd76f7ffdf5aa02bb1031c9cf1891791607db25a0 (diff)
downloadrabbitmq-server-cba6ad4fb962117040d243281dd1c0773eda2b42.tar.gz
Merge in bug24009
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl73
1 files changed, 42 insertions, 31 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e4827526..84584a1c 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -21,7 +21,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/2]).
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -44,9 +44,9 @@
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
--type(add_res() :: bind_res() | rabbit_misc:const(bind_res())).
+-type(add_res() :: bind_res() | rabbit_misc:thunk(bind_res())).
-type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')).
--type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())).
+-type(remove_res() :: bind_or_error() | rabbit_misc:thunk(bind_or_error())).
-opaque(deletions() :: dict()).
@@ -78,7 +78,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
+-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -156,15 +156,18 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_binding(B, Durable, durable(Dst),
fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
- end;
+ add_notify(Src, B);
false -> rabbit_misc:const(not_found)
end;
- [_] -> fun rabbit_misc:const_ok/1
+ [_] -> fun rabbit_misc:const_ok/0
+ end.
+
+add_notify(X, B) ->
+ ok = rabbit_exchange:callback(X, add_binding, [transaction, X, B]),
+ Serial = rabbit_exchange:serial(X),
+ fun () ->
+ ok = rabbit_exchange:callback(X, add_binding, [Serial, X, B]),
+ ok = rabbit_event:notify(binding_created, info(B))
end.
remove(Binding, InnerFun) ->
@@ -189,10 +192,8 @@ remove(Binding, InnerFun) ->
end
end,
case Result of
- {error, _} = Err ->
- rabbit_misc:const(Err);
- {ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ {error, _} = Err -> rabbit_misc:const(Err);
+ {ok, Deletions} -> process_deletions(Deletions)
end
end).
@@ -443,19 +444,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
- dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
- deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
- end
- end, ok, Deletions).
+process_deletions(Deletions) ->
+ AugmentedDeletions =
+ dict:map(fun (_XName, {X, deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, delete, Bs),
+ {X, deleted, Bs, none};
+ (_XName, {X, not_deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, remove_bindings, Bs),
+ {X, not_deleted, Bs, rabbit_exchange:serial(X)}
+ end, Deletions),
+ fun() ->
+ dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
+ ok = rabbit_event:notify(
+ exchange_deleted, [{name, XName}]),
+ del_notify(Bs),
+ x_callback(Serial, X, delete, Bs);
+ (_XName, {X, not_deleted, Bs, Serial}, ok) ->
+ del_notify(Bs),
+ x_callback(Serial, X, remove_bindings, Bs)
+ end, ok, AugmentedDeletions)
+ end.
+
+del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
+
+x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).