summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-08 15:46:38 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-08 15:46:38 +0100
commit9579f96dd9507569d4e4f6576673137e3a329193 (patch)
tree2dce15b93be8d79d280f0f4727241d7bfcc64d21
parent16d52ee78e6924f4631edae6b9b0221efcd1d918 (diff)
parent34ecbeb4c9b16cb44dd0bdbc757a96e7f77af944 (diff)
downloadrabbitmq-server-9579f96dd9507569d4e4f6576673137e3a329193.tar.gz
Merge in default.
-rw-r--r--src/rabbit_binding.erl41
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl6
3 files changed, 39 insertions, 13 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ca7be59a..e4827526 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -98,6 +98,13 @@ recover(XNames, QNames) ->
XNameSet = sets:from_list(XNames),
QNameSet = sets:from_list(QNames),
rabbit_misc:table_filter(
+ fun (_Route) -> true end,
+ fun (Route, true) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write);
+ (_Route, false) ->
+ ok
+ end, rabbit_durable_route),
+ rabbit_misc:table_filter(
fun (#route{binding = B = #binding{destination = Dst =
#resource{kind = Kind}}}) ->
%% The check against rabbit_durable_route is in case it
@@ -106,7 +113,7 @@ recover(XNames, QNames) ->
exchange -> XNameSet;
queue -> QNameSet
end) andalso
- mnesia:read({rabbit_durable_route, B}) =/= []
+ mnesia:read({rabbit_semi_durable_route, B}) =/= []
end,
fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
case Tx of
@@ -116,7 +123,7 @@ recover(XNames, QNames) ->
{ok, X} = rabbit_exchange:lookup(Src),
rabbit_exchange:callback(X, add_binding, [Tx, X, B])
end,
- rabbit_durable_route),
+ rabbit_semi_durable_route),
ok.
exists(Binding) ->
@@ -147,7 +154,8 @@ add(Src, Dst, B) ->
[] -> Durable = all_durable([Src, Dst]),
case (not Durable orelse
mnesia:read({rabbit_durable_route, B}) =:= []) of
- true -> ok = sync_binding(B, Durable, fun mnesia:write/3),
+ true -> ok = sync_binding(B, Durable, durable(Dst),
+ fun mnesia:write/3),
fun (Tx) ->
ok = rabbit_exchange:callback(
Src, add_binding, [Tx, Src, B]),
@@ -172,6 +180,7 @@ remove(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
ok = sync_binding(B, all_durable([Src, Dst]),
+ durable(Dst),
fun mnesia:delete_object/3),
{ok, maybe_auto_delete(B#binding.source,
[B], new_deletions())};
@@ -244,7 +253,8 @@ has_for_source(SrcName) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+ contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route,
+ Match).
remove_for_source(SrcName) ->
[begin
@@ -266,10 +276,10 @@ remove_transient_for_destination(DstName) ->
%%----------------------------------------------------------------------------
-all_durable(Resources) ->
- lists:all(fun (#exchange{durable = D}) -> D;
- (#amqqueue{durable = D}) -> D
- end, Resources).
+all_durable(Resources) -> lists:all(fun durable/1, Resources).
+
+durable(#exchange{durable = D}) -> D;
+durable(#amqqueue{durable = D}) -> D.
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
@@ -281,12 +291,16 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, true, Fun) ->
- ok = Fun(rabbit_durable_route, #route{binding = Binding}, write),
- ok = sync_transient_binding(Binding, Fun);
+sync_binding(Binding, true, SemiDurable, Fun) ->
+ Fun(rabbit_durable_route, #route{binding = Binding}, write),
+ sync_binding(Binding, false, SemiDurable, Fun);
+
+sync_binding(Binding, false, true, Fun) ->
+ Fun(rabbit_semi_durable_route, #route{binding = Binding}, write),
+ sync_binding(Binding, false, false, Fun);
-sync_binding(Binding, false, Fun) ->
- ok = sync_transient_binding(Binding, Fun).
+sync_binding(Binding, false, false, Fun) ->
+ sync_transient_binding(Binding, Fun).
sync_transient_binding(Binding, Fun) ->
{Route, ReverseRoute} = route_with_reverse(Binding),
@@ -369,6 +383,7 @@ maybe_auto_delete(XName, Bindings, Deletions) ->
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_semi_durable_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae..77b06d0c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -187,6 +187,11 @@ table_definitions() ->
{attributes, record_info(fields, route)},
{disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29e..842c3b4f 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({semi_durable_route, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+semi_durable_route() ->
+ create(rabbit_semi_durable_route, [{record_name, route},
+ {attributes, [binding, value]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->