summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-31 17:54:19 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-31 17:54:19 +0100
commit9f5fcfc3b5ac739ac084cafea30f41715a669e85 (patch)
treec2ea7a5c35e6ceee6cf64630680bfd7da5142322
parent3833c25b23c209f3c5a77d14ec459b15c82b7f55 (diff)
downloadrabbitmq-server-9f5fcfc3b5ac739ac084cafea30f41715a669e85.tar.gz
Introduce rabbit_semi_durable_route to kee bindings between transient exchanges and durable queues alive if the durable queue is down.
-rw-r--r--src/rabbit_binding.erl28
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl6
3 files changed, 30 insertions, 9 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index fff9016c..8a1c3c35 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -98,6 +98,10 @@ recover(Xs, Qs) ->
XNames = sets:from_list([Name || #exchange{name = Name} <- Xs]),
QNames = sets:from_list([Name || #amqqueue{name = Name} <- Qs]),
rabbit_misc:table_fold(
+ fun (Route, ok) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write)
+ end, ok, rabbit_durable_route),
+ rabbit_misc:table_fold(
fun (Route = #route{binding = B}, Acc) ->
case should_recover(B, XNames, QNames) of
true -> {_, Rev} = route_with_reverse(Route),
@@ -106,7 +110,7 @@ recover(Xs, Qs) ->
[B | Acc];
false -> Acc
end
- end, [], rabbit_durable_route).
+ end, [], rabbit_semi_durable_route).
should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }},
XNames, QNames) ->
@@ -138,7 +142,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
+ [] -> ok = sync_binding(B, Src, Dst,
fun mnesia:write/3),
fun (Tx) ->
ok = rabbit_exchange:callback(
@@ -166,7 +170,7 @@ remove(Binding, InnerFun) ->
[_] ->
case InnerFun(Src, Dst) of
ok ->
- ok = sync_binding(B, all_durable([Src, Dst]),
+ ok = sync_binding(B, Src, Dst,
fun mnesia:delete_object/3),
{ok, maybe_auto_delete(B#binding.source,
[B], new_deletions())};
@@ -239,7 +243,8 @@ has_for_source(SrcName) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+ contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route,
+ Match).
remove_for_source(SrcName) ->
[begin
@@ -276,13 +281,17 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
+sync_binding(Binding, Src, Dest, Fun) ->
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = case all_durable([Src, Dest]) of
+ true -> Fun(rabbit_durable_route, Route, write);
false -> ok
end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = case Dest of
+ #amqqueue{durable = true} -> Fun(rabbit_semi_durable_route, Route,
+ write);
+ _ -> ok
+ end,
ok = Fun(rabbit_route, Route, write),
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
@@ -363,6 +372,7 @@ maybe_auto_delete(XName, Bindings, Deletions) ->
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_semi_durable_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fbcf07ae..77b06d0c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -187,6 +187,11 @@ table_definitions() ->
{attributes, record_info(fields, route)},
{disc_copies, [node()]},
{match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
{rabbit_route,
[{record_name, route},
{attributes, record_info(fields, route)},
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7567c29e..842c3b4f 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
+-rabbit_upgrade({semi_durable_route, mnesia, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+semi_durable_route() ->
+ create(rabbit_semi_durable_route, [{record_name, route},
+ {attributes, [binding, value]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->