summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-28 18:02:53 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-28 18:02:53 +0100
commita517487a1e91efb27b2f3654b153ff0d6cbb5fbe (patch)
treed3d84bfe11b83f83c574b19160b32b7c4912dc59
parent07df8c18c25d0fd69649e603252a12754ae7eed5 (diff)
downloadrabbitmq-server-a517487a1e91efb27b2f3654b153ff0d6cbb5fbe.tar.gz
Better abstraction.
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_binding.erl57
-rw-r--r--src/rabbit_exchange.erl10
3 files changed, 35 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e300fa32..167b1a55 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -433,11 +433,7 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- Serials = rabbit_binding:process_deletions(
- Deletions, transaction),
- fun () -> rabbit_binding:process_deletions(
- Deletions, Serials)
- end
+ rabbit_binding:process_deletions(Deletions)
end
end).
@@ -471,12 +467,9 @@ on_node_down(Node) ->
#amqqueue{name = QueueName, pid = Pid}
<- mnesia:table(rabbit_queue),
node(Pid) == Node])),
- Dels1 = lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels),
- Serials = rabbit_binding:process_deletions(Dels1, transaction),
- fun () ->
- rabbit_binding:process_deletions(Dels1, Serials)
- end
+ rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels))
end).
delete_queue(QueueName) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 13e829e2..31605844 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -21,7 +21,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/2]).
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -77,7 +77,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
+-spec(process_deletions/1 :: (deletions()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -160,11 +160,8 @@ remove(Binding, InnerFun) ->
end
end,
case Result of
- {error, _} = Err ->
- rabbit_misc:const(Err);
- {ok, Deletions} ->
- Serials = process_deletions(Deletions, transaction),
- fun () -> process_deletions(Deletions, Serials) end
+ {error, _} = Err -> rabbit_misc:const(Err);
+ {ok, Deletions} -> process_deletions(Deletions)
end
end).
@@ -408,27 +405,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, transaction) ->
- process_deletions(
- fun (Deleted, X, Bindings, Acc) ->
- pd_callback(transaction, Deleted, X, Bindings),
- dict:store(X, serial(X), Acc)
- end,
- Deletions, dict:new());
-
-process_deletions(Deletions, Serials) ->
- process_deletions(
- fun (Deleted, X, Bindings, Acc) ->
- [rabbit_event:notify(binding_deleted, info(B)) || B <- Bindings],
- pd_callback(dict:fetch(X, Serials), Deleted, X, Bindings),
- case Deleted of
- deleted -> rabbit_event:notify(exchange_deleted,
- [{name, X#exchange.name}]);
- _ -> ok
+process_deletions(Deletions) ->
+ Serials = process_deletions(
+ fun (Deleted, X, Bindings, Acc) ->
+ pd_callback(transaction, Deleted, X, Bindings),
+ dict:store(X, serial(X), Acc)
+ end,
+ Deletions, dict:new()),
+ fun() ->
+ process_deletions(
+ fun (Deleted, X, Bindings, Acc) ->
+ [rabbit_event:notify(binding_deleted, info(B)) ||
+ B <- Bindings],
+ pd_callback(dict:fetch(X, Serials), Deleted, X, Bindings),
+ case Deleted of
+ deleted -> rabbit_event:notify(
+ exchange_deleted,
+ [{name, X#exchange.name}]);
+ _ -> ok
+ end,
+ Acc
end,
- Acc
- end,
- Deletions, ok).
+ Deletions, ok)
+ end.
process_deletions(Fun, Deletions, Acc0) ->
dict:fold(
@@ -436,8 +435,8 @@ process_deletions(Fun, Deletions, Acc0) ->
Fun(Deleted, X, lists:flatten(Bindings), Acc)
end, Acc0, Deletions).
-pd_callback(Arg, CB, X, Bindings) ->
- ok = rabbit_exchange:callback(X, case CB of
+pd_callback(Arg, Deleted, X, Bindings) ->
+ ok = rabbit_exchange:callback(X, case Deleted of
not_deleted -> remove_bindings;
deleted -> delete
end, [Arg, X, Bindings]).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e704a44c..c1c1d3c8 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -290,13 +290,9 @@ delete0(XName, Fun) ->
fun (X) ->
case Fun(X) of
{deleted, X, Bs, Deletions} ->
- Dels1 = rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions),
- Serials = rabbit_binding:process_deletions(
- Dels1, transaction),
- fun () ->
- rabbit_binding:process_deletions(Dels1, Serials)
- end;
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
{error, _InUseOrNotFound} = E ->
rabbit_misc:const(E)
end