summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-24 10:46:34 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-24 10:46:34 +0000
commitc18f07aaa2b4de1442215c8b2d3dda4404ccdecf (patch)
tree168d052e62e820cab223e505b08b95df69f2a97f /src/rabbit_binding.erl
parent5d51177b297d5425741b808fb6f78a2712a0376e (diff)
parentc3519792e0c7358da2c108b43e2214f2a9f08875 (diff)
downloadrabbitmq-server-c18f07aaa2b4de1442215c8b2d3dda4404ccdecf.tar.gz
Merge from default
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl68
1 files changed, 57 insertions, 11 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6167790e..cc7aea33 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -127,8 +127,7 @@ add(Binding, InnerFun) ->
fun (Tx) ->
ok = rabbit_exchange:callback(
Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
+ process_addition(Src, B, Tx)
end;
[_] -> fun rabbit_misc:const_ok/1
end;
@@ -161,7 +160,7 @@ remove(Binding, InnerFun) ->
{error, _} = Err ->
rabbit_misc:const(Err);
{ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ fun (Tx) -> process_deletions(Deletions, Tx) end
end
end).
@@ -405,19 +404,66 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
+process_addition(Src, _B, transaction) ->
+ serial(Src);
+
+process_addition(_Src, B, _Serial) ->
+ ok = rabbit_event:notify(binding_created, info(B)).
+
+process_deletions(Deletions, transaction) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(transaction, remove_bindings, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ fun (X, Bindings, Acc) ->
+ pd_callback(transaction, delete, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ Deletions, dict:new(), true);
+
+process_deletions(Deletions, Serials) ->
+ process_deletions(
+ fun (X, Bindings, Acc) ->
+ pd_callback(dict:fetch(X, Serials), remove_bindings, X, Bindings),
+ Acc
+ end,
+ fun (X, Bindings, Acc) ->
+ pd_callback(dict:fetch(X, Serials), delete, X, Bindings),
+ rabbit_event:notify(exchange_deleted, [{name, X#exchange.name}]),
+ Acc
+ end,
+ Deletions, ok, false).
+
+process_deletions(NotDeletedFun, DeletedFun, Deletions, Acc0, Tx) ->
dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
+ fun (_XName, {X, Deleted, Bindings}, Acc) ->
FlatBindings = lists:flatten(Bindings),
[rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
B <- FlatBindings],
case Deleted of
not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
+ NotDeletedFun(X, FlatBindings, Acc);
deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
+ DeletedFun(X, FlatBindings, Acc)
end
- end, ok, Deletions).
+ end, Acc0, Deletions).
+
+pd_callback(Arg, CB, X, Bindings) ->
+ ok = rabbit_exchange:callback(X, CB, [Arg, X, Bindings]).
+
+serial(X) ->
+ case rabbit_exchange:callback(X, serialise_events, [X]) of
+ true -> next_serial(X);
+ false -> none
+ end.
+
+next_serial(#exchange{name = Name}) ->
+ Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of
+ [] -> 0;
+ [#exchange_serial{serial = S}] -> S
+ end,
+ Serial = Prev + 1,
+ mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = Name, serial = Serial}, write),
+ Serial.