diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-09-28 22:19:29 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-09-28 22:19:29 +0100 |
commit | a206e2fac02a48d60d4dc386b2080d2ab7d91e60 (patch) | |
tree | 84d47b9ad723bdcbf27f0657edb4df1a3f008c43 | |
parent | 387c9a72b16d491104aaf67069c20a987f3dc418 (diff) | |
download | rabbitmq-server-a206e2fac02a48d60d4dc386b2080d2ab7d91e60.tar.gz |
cosmetic changes and some minor refactoring
-rw-r--r-- | include/rabbit.hrl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 183 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 3 |
4 files changed, 87 insertions, 111 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0ab51b36..b0890144 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -45,12 +45,8 @@ -record(amqqueue, {name, durable, auto_delete, arguments, pid}). -%% In the route and reverse route record definitions, -%% theconstant field seems to be required because the -%% underlying storage is ets, which stores key value pairs. -%% The binding field is made up of an {Exchange, Binding, Queue} +%% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). -%% The reverse_binding field is made up of an {Queue, Binding, Exchange} -record(reverse_route, {reverse_binding, value = const}). -record(binding, {exchange_name, key, queue_name}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 22d856da..f825a6d2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -65,9 +65,6 @@ publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/1 :: (binding()) -> 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -84,30 +81,21 @@ %%---------------------------------------------------------------------------- recover() -> - % TODO: These two functions share commonalities, hence - % maybe a refactoring target - ok = recover_durable_exchanges(), - ok = recover_durable_routes(), - ok. - -recover_durable_routes() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl(fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), - Acc - end, ok, durable_routes) - end). - -recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:foldl(fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges) + mnesia:foldl( + fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges), + mnesia:foldl( + fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes), + ok end). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> @@ -164,12 +152,13 @@ list_vhost_exchanges(VHostPath) -> #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). routes_for_exchange(Name) -> - qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} - <- mnesia:table(route), + qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} + <- mnesia:table(route), N == Name])). %% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -199,9 +188,8 @@ simple_publish(Mandatory, Immediate, %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -% -% TODO: This returns a list of QPids to route to. -% Maybe this should be handled by a cursor instead. +%% +%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey) -> Query = qlc:q([QName || #route{binding = #binding{ @@ -222,8 +210,8 @@ route(X = #exchange{type = direct}, RoutingKey) -> route_internal(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, - key = RoutingKey, - queue_name = '$1'}}, + queue_name = '$1', + key = RoutingKey}}, lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> @@ -233,85 +221,60 @@ lookup_qpids(Queues) -> [QPid | Acc] end, [], sets:from_list(Queues)). -% TODO: Should all of the route and binding management not be refactored to -% it's own module, especially seeing as unbind will have to be implemented -% for 0.91 ? -delete_routes(QueueName) -> - Binding = #binding{queue_name = QueueName, - exchange_name = '_', key = '_'}, +%% TODO: Should all of the route and binding management not be +%% refactored to its own module, especially seeing as unbind will have +%% to be implemented for 0.91 ? +delete_bindings(QueueName) -> + Binding = #binding{exchange_name = '_', + queue_name = QueueName, + key = '_'}, {Route, ReverseRoute} = route_with_reverse(Binding), ok = mnesia:delete_object(Route), ok = mnesia:delete_object(ReverseRoute), ok = mnesia:delete_object(durable_routes, Route, write). -% TODO: Don't really like this double lookup, it *seems* very clunky -call_with_exchange_and_queue(#binding{exchange_name = Exchange, - queue_name = Queue}, Fun) -> - case mnesia:wread({exchange, Exchange}) of - [] -> {error, exchange_not_found}; - [X] -> - case mnesia:wread({amqqueue, Queue}) of - [] -> {error, queue_not_found}; - [Q] -> - Fun(X, Q) - end - end. - +call_with_exchange_and_queue(Exchange, Queue, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> + case mnesia:wread({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> + Fun(X, Q) + end + end + end). add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> - Binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end). + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, + Q#amqqueue.durable, fun mnesia:write/3) + end + end). delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) -> - Binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName}, - rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding) - end). - -% Must be called from within a transaction -add_binding(Binding) -> call_with_exchange_and_queue( - Binding, - fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - ok = sync_binding(Binding, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). - -% Must be called from within a transaction -delete_binding(Binding) -> - call_with_exchange_and_queue( - Binding, - fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end). - -% TODO: Should the exported function not get renamed to delete_routes -% instead of this indirection? -delete_bindings(QueueName) -> - delete_routes(QueueName). - -%% Must run within a transaction. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> - case internal_delete(ExchangeName, true) of - {error, in_use} -> ok; - Other -> Other - end. - -reverse_binding(#binding{exchange_name = Exchange, key = Key, - queue_name = Queue}) -> - #reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}. + ExchangeName, QueueName, + fun (X, Q) -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) + end). %% Must run within a transaction. -sync_binding(Binding, Durable, Fun) -> +sync_binding(ExchangeName, QueueName, RoutingKey, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -320,6 +283,15 @@ sync_binding(Binding, Durable, Fun) -> <- tuple_to_list(route_with_reverse(Binding))], ok. +%% Must run within a transaction. +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> + case internal_delete(ExchangeName, true) of + {error, in_use} -> ok; + Other -> Other + end. + route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); route_with_reverse(Binding = #binding{}) -> @@ -327,6 +299,13 @@ route_with_reverse(Binding = #binding{}) -> ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)}, {Route, ReverseRoute}. +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key}. + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. @@ -368,8 +347,8 @@ internal_delete(ExchangeName, _IfUnused = true) -> internal_delete(ExchangeName, false) -> do_internal_delete(ExchangeName, routes_for_exchange(ExchangeName)). -% Don't know if iterating over a list in process memory is cool -% Maybe we should iterate over the DB cursor? +%% TODO: Don't know if iterating over a list in process memory is cool +%% Maybe we should iterate over the DB cursor? do_internal_delete(ExchangeName, Routes) -> case mnesia:wread({exchange, ExchangeName}) of [] -> {error, not_found}; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ad715cf7..60c32da4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -229,10 +229,10 @@ with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). -%% Making this a sync_transaction allows us to use dirty_read -%% elsewhere and get a consistent result even when that read -%% executes on a different node. execute_mnesia_transaction(TxFun) -> + %% Making this a sync_transaction allows us to use dirty_read + %% elsewhere and get a consistent result even when that read + %% executes on a different node. case mnesia:sync_transaction(TxFun) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index bfb42746..04979e2a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -109,7 +109,8 @@ table_definitions() -> {type, ordered_set}, {record_name, route}, {attributes, record_info(fields, route)}]}, - {route, [{type, ordered_set}, {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, {reverse_route, [{type, ordered_set}, {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, |