summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-09-28 22:19:29 +0100
committerMatthias Radestock <matthias@lshift.net>2008-09-28 22:19:29 +0100
commita206e2fac02a48d60d4dc386b2080d2ab7d91e60 (patch)
tree84d47b9ad723bdcbf27f0657edb4df1a3f008c43
parent387c9a72b16d491104aaf67069c20a987f3dc418 (diff)
downloadrabbitmq-server-a206e2fac02a48d60d4dc386b2080d2ab7d91e60.tar.gz
cosmetic changes and some minor refactoring
-rw-r--r--include/rabbit.hrl6
-rw-r--r--src/rabbit_exchange.erl183
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_mnesia.erl3
4 files changed, 87 insertions, 111 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0ab51b36..b0890144 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -45,12 +45,8 @@
-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
-%% In the route and reverse route record definitions,
-%% theconstant field seems to be required because the
-%% underlying storage is ets, which stores key value pairs.
-%% The binding field is made up of an {Exchange, Binding, Queue}
+%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
-%% The reverse_binding field is made up of an {Queue, Binding, Exchange}
-record(reverse_route, {reverse_binding, value = const}).
-record(binding, {exchange_name, key, queue_name}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 22d856da..f825a6d2 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -65,9 +65,6 @@
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
--spec(add_binding/1 :: (binding()) -> 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/1 :: (binding()) -> 'ok' | not_found()).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -84,30 +81,21 @@
%%----------------------------------------------------------------------------
recover() ->
- % TODO: These two functions share commonalities, hence
- % maybe a refactoring target
- ok = recover_durable_exchanges(),
- ok = recover_durable_routes(),
- ok.
-
-recover_durable_routes() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute),
- Acc
- end, ok, durable_routes)
- end).
-
-recover_durable_exchanges() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:foldl(fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges)
+ mnesia:foldl(
+ fun (Exchange, Acc) ->
+ ok = mnesia:write(Exchange),
+ Acc
+ end, ok, durable_exchanges),
+ mnesia:foldl(
+ fun (Route, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute),
+ Acc
+ end, ok, durable_routes),
+ ok
end).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
@@ -164,12 +152,13 @@ list_vhost_exchanges(VHostPath) ->
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
routes_for_exchange(Name) ->
- qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}}
- <- mnesia:table(route),
+ qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}}
+ <- mnesia:table(route),
N == Name])).
%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
+ ContentTypeBin, BodyBin) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = #'P_basic'{content_type = ContentTypeBin},
@@ -199,9 +188,8 @@ simple_publish(Mandatory, Immediate,
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%
-% TODO: This returns a list of QPids to route to.
-% Maybe this should be handled by a cursor instead.
+%%
+%% TODO: Maybe this should be handled by a cursor instead.
route(#exchange{name = Name, type = topic}, RoutingKey) ->
Query = qlc:q([QName ||
#route{binding = #binding{
@@ -222,8 +210,8 @@ route(X = #exchange{type = direct}, RoutingKey) ->
route_internal(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
- key = RoutingKey,
- queue_name = '$1'}},
+ queue_name = '$1',
+ key = RoutingKey}},
lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
@@ -233,85 +221,60 @@ lookup_qpids(Queues) ->
[QPid | Acc]
end, [], sets:from_list(Queues)).
-% TODO: Should all of the route and binding management not be refactored to
-% it's own module, especially seeing as unbind will have to be implemented
-% for 0.91 ?
-delete_routes(QueueName) ->
- Binding = #binding{queue_name = QueueName,
- exchange_name = '_', key = '_'},
+%% TODO: Should all of the route and binding management not be
+%% refactored to its own module, especially seeing as unbind will have
+%% to be implemented for 0.91 ?
+delete_bindings(QueueName) ->
+ Binding = #binding{exchange_name = '_',
+ queue_name = QueueName,
+ key = '_'},
{Route, ReverseRoute} = route_with_reverse(Binding),
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(ReverseRoute),
ok = mnesia:delete_object(durable_routes, Route, write).
-% TODO: Don't really like this double lookup, it *seems* very clunky
-call_with_exchange_and_queue(#binding{exchange_name = Exchange,
- queue_name = Queue}, Fun) ->
- case mnesia:wread({exchange, Exchange}) of
- [] -> {error, exchange_not_found};
- [X] ->
- case mnesia:wread({amqqueue, Queue}) of
- [] -> {error, queue_not_found};
- [Q] ->
- Fun(X, Q)
- end
- end.
-
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({exchange, Exchange}) of
+ [] -> {error, exchange_not_found};
+ [X] ->
+ case mnesia:wread({amqqueue, Queue}) of
+ [] -> {error, queue_not_found};
+ [Q] ->
+ Fun(X, Q)
+ end
+ end
+ end).
add_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
- Binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun() -> add_binding(Binding) end).
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey,
+ Q#amqqueue.durable, fun mnesia:write/3)
+ end
+ end).
delete_binding(ExchangeName, QueueName, RoutingKey, _Arguments) ->
- Binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName},
- rabbit_misc:execute_mnesia_transaction(fun() -> delete_binding(Binding)
- end).
-
-% Must be called from within a transaction
-add_binding(Binding) ->
call_with_exchange_and_queue(
- Binding,
- fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- ok = sync_binding(Binding, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
- end).
-
-% Must be called from within a transaction
-delete_binding(Binding) ->
- call_with_exchange_and_queue(
- Binding,
- fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
- end).
-
-% TODO: Should the exported function not get renamed to delete_routes
-% instead of this indirection?
-delete_bindings(QueueName) ->
- delete_routes(QueueName).
-
-%% Must run within a transaction.
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
- case internal_delete(ExchangeName, true) of
- {error, in_use} -> ok;
- Other -> Other
- end.
-
-reverse_binding(#binding{exchange_name = Exchange, key = Key,
- queue_name = Queue}) ->
- #reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}.
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey,
+ Q#amqqueue.durable, fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
+ end).
%% Must run within a transaction.
-sync_binding(Binding, Durable, Fun) ->
+sync_binding(ExchangeName, QueueName, RoutingKey, Durable, Fun) ->
+ Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey},
ok = case Durable of
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
@@ -320,6 +283,15 @@ sync_binding(Binding, Durable, Fun) ->
<- tuple_to_list(route_with_reverse(Binding))],
ok.
+%% Must run within a transaction.
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ ok;
+maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
+ case internal_delete(ExchangeName, true) of
+ {error, in_use} -> ok;
+ Other -> Other
+ end.
+
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
route_with_reverse(Binding = #binding{}) ->
@@ -327,6 +299,13 @@ route_with_reverse(Binding = #binding{}) ->
ReverseRoute = #reverse_route{reverse_binding = reverse_binding(Binding)},
{Route, ReverseRoute}.
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key}.
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.
@@ -368,8 +347,8 @@ internal_delete(ExchangeName, _IfUnused = true) ->
internal_delete(ExchangeName, false) ->
do_internal_delete(ExchangeName, routes_for_exchange(ExchangeName)).
-% Don't know if iterating over a list in process memory is cool
-% Maybe we should iterate over the DB cursor?
+%% TODO: Don't know if iterating over a list in process memory is cool
+%% Maybe we should iterate over the DB cursor?
do_internal_delete(ExchangeName, Routes) ->
case mnesia:wread({exchange, ExchangeName}) of
[] -> {error, not_found};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ad715cf7..60c32da4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -229,10 +229,10 @@ with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
-%% Making this a sync_transaction allows us to use dirty_read
-%% elsewhere and get a consistent result even when that read
-%% executes on a different node.
execute_mnesia_transaction(TxFun) ->
+ %% Making this a sync_transaction allows us to use dirty_read
+ %% elsewhere and get a consistent result even when that read
+ %% executes on a different node.
case mnesia:sync_transaction(TxFun) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index bfb42746..04979e2a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -109,7 +109,8 @@ table_definitions() ->
{type, ordered_set},
{record_name, route},
{attributes, record_info(fields, route)}]},
- {route, [{type, ordered_set}, {attributes, record_info(fields, route)}]},
+ {route, [{type, ordered_set},
+ {attributes, record_info(fields, route)}]},
{reverse_route, [{type, ordered_set},
{attributes, record_info(fields, reverse_route)}]},
{durable_exchanges, [{disc_copies, [node()]},