diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-08 15:46:38 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-08 15:46:38 +0100 |
commit | 9579f96dd9507569d4e4f6576673137e3a329193 (patch) | |
tree | 2dce15b93be8d79d280f0f4727241d7bfcc64d21 | |
parent | 16d52ee78e6924f4631edae6b9b0221efcd1d918 (diff) | |
parent | 34ecbeb4c9b16cb44dd0bdbc757a96e7f77af944 (diff) | |
download | rabbitmq-server-9579f96dd9507569d4e4f6576673137e3a329193.tar.gz |
Merge in default.
-rw-r--r-- | src/rabbit_binding.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 5 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
3 files changed, 39 insertions, 13 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ca7be59a..e4827526 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -98,6 +98,13 @@ recover(XNames, QNames) -> XNameSet = sets:from_list(XNames), QNameSet = sets:from_list(QNames), rabbit_misc:table_filter( + fun (_Route) -> true end, + fun (Route, true) -> + ok = mnesia:write(rabbit_semi_durable_route, Route, write); + (_Route, false) -> + ok + end, rabbit_durable_route), + rabbit_misc:table_filter( fun (#route{binding = B = #binding{destination = Dst = #resource{kind = Kind}}}) -> %% The check against rabbit_durable_route is in case it @@ -106,7 +113,7 @@ recover(XNames, QNames) -> exchange -> XNameSet; queue -> QNameSet end) andalso - mnesia:read({rabbit_durable_route, B}) =/= [] + mnesia:read({rabbit_semi_durable_route, B}) =/= [] end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> case Tx of @@ -116,7 +123,7 @@ recover(XNames, QNames) -> {ok, X} = rabbit_exchange:lookup(Src), rabbit_exchange:callback(X, add_binding, [Tx, X, B]) end, - rabbit_durable_route), + rabbit_semi_durable_route), ok. exists(Binding) -> @@ -147,7 +154,8 @@ add(Src, Dst, B) -> [] -> Durable = all_durable([Src, Dst]), case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of - true -> ok = sync_binding(B, Durable, fun mnesia:write/3), + true -> ok = sync_binding(B, Durable, durable(Dst), + fun mnesia:write/3), fun (Tx) -> ok = rabbit_exchange:callback( Src, add_binding, [Tx, Src, B]), @@ -172,6 +180,7 @@ remove(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> ok = sync_binding(B, all_durable([Src, Dst]), + durable(Dst), fun mnesia:delete_object/3), {ok, maybe_auto_delete(B#binding.source, [B], new_deletions())}; @@ -244,7 +253,8 @@ has_for_source(SrcName) -> %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route, + Match). remove_for_source(SrcName) -> [begin @@ -266,10 +276,10 @@ remove_transient_for_destination(DstName) -> %%---------------------------------------------------------------------------- -all_durable(Resources) -> - lists:all(fun (#exchange{durable = D}) -> D; - (#amqqueue{durable = D}) -> D - end, Resources). +all_durable(Resources) -> lists:all(fun durable/1, Resources). + +durable(#exchange{durable = D}) -> D; +durable(#amqqueue{durable = D}) -> D. binding_action(Binding = #binding{source = SrcName, destination = DstName, @@ -281,12 +291,16 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). -sync_binding(Binding, true, Fun) -> - ok = Fun(rabbit_durable_route, #route{binding = Binding}, write), - ok = sync_transient_binding(Binding, Fun); +sync_binding(Binding, true, SemiDurable, Fun) -> + Fun(rabbit_durable_route, #route{binding = Binding}, write), + sync_binding(Binding, false, SemiDurable, Fun); + +sync_binding(Binding, false, true, Fun) -> + Fun(rabbit_semi_durable_route, #route{binding = Binding}, write), + sync_binding(Binding, false, false, Fun); -sync_binding(Binding, false, Fun) -> - ok = sync_transient_binding(Binding, Fun). +sync_binding(Binding, false, false, Fun) -> + sync_transient_binding(Binding, Fun). sync_transient_binding(Binding, Fun) -> {Route, ReverseRoute} = route_with_reverse(Binding), @@ -369,6 +383,7 @@ maybe_auto_delete(XName, Bindings, Deletions) -> delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_semi_durable_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). delete_transient_forward_routes(Route) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae..77b06d0c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -187,6 +187,11 @@ table_definitions() -> {attributes, record_info(fields, route)}, {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29e..842c3b4f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({semi_durable_route, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +semi_durable_route() -> + create(rabbit_semi_durable_route, [{record_name, route}, + {attributes, [binding, value]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |