diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 17:54:19 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 17:54:19 +0100 |
commit | 9f5fcfc3b5ac739ac084cafea30f41715a669e85 (patch) | |
tree | c2ea7a5c35e6ceee6cf64630680bfd7da5142322 | |
parent | 3833c25b23c209f3c5a77d14ec459b15c82b7f55 (diff) | |
download | rabbitmq-server-9f5fcfc3b5ac739ac084cafea30f41715a669e85.tar.gz |
Introduce rabbit_semi_durable_route to kee bindings between transient exchanges and durable queues alive if the durable queue is down.
-rw-r--r-- | src/rabbit_binding.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 5 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
3 files changed, 30 insertions, 9 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index fff9016c..8a1c3c35 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -98,6 +98,10 @@ recover(Xs, Qs) -> XNames = sets:from_list([Name || #exchange{name = Name} <- Xs]), QNames = sets:from_list([Name || #amqqueue{name = Name} <- Qs]), rabbit_misc:table_fold( + fun (Route, ok) -> + ok = mnesia:write(rabbit_semi_durable_route, Route, write) + end, ok, rabbit_durable_route), + rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> case should_recover(B, XNames, QNames) of true -> {_, Rev} = route_with_reverse(Route), @@ -106,7 +110,7 @@ recover(Xs, Qs) -> [B | Acc]; false -> Acc end - end, [], rabbit_durable_route). + end, [], rabbit_semi_durable_route). should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }}, XNames, QNames) -> @@ -138,7 +142,7 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> ok = sync_binding(B, all_durable([Src, Dst]), + [] -> ok = sync_binding(B, Src, Dst, fun mnesia:write/3), fun (Tx) -> ok = rabbit_exchange:callback( @@ -166,7 +170,7 @@ remove(Binding, InnerFun) -> [_] -> case InnerFun(Src, Dst) of ok -> - ok = sync_binding(B, all_durable([Src, Dst]), + ok = sync_binding(B, Src, Dst, fun mnesia:delete_object/3), {ok, maybe_auto_delete(B#binding.source, [B], new_deletions())}; @@ -239,7 +243,8 @@ has_for_source(SrcName) -> %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route, + Match). remove_for_source(SrcName) -> [begin @@ -276,13 +281,17 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). -sync_binding(Binding, Durable, Fun) -> - ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); +sync_binding(Binding, Src, Dest, Fun) -> + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = case all_durable([Src, Dest]) of + true -> Fun(rabbit_durable_route, Route, write); false -> ok end, - {Route, ReverseRoute} = route_with_reverse(Binding), + ok = case Dest of + #amqqueue{durable = true} -> Fun(rabbit_semi_durable_route, Route, + write); + _ -> ok + end, ok = Fun(rabbit_route, Route, write), ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. @@ -363,6 +372,7 @@ maybe_auto_delete(XName, Bindings, Deletions) -> delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_semi_durable_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). delete_transient_forward_routes(Route) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae..77b06d0c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -187,6 +187,11 @@ table_definitions() -> {attributes, record_info(fields, route)}, {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29e..842c3b4f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({semi_durable_route, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +semi_durable_route() -> + create(rabbit_semi_durable_route, [{record_name, route}, + {attributes, [binding, value]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |