diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-17 11:42:08 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-17 11:42:08 +0100 |
commit | 133cde4888174bc7d9a294cdf1895ae4ed274708 (patch) | |
tree | 7d9c3aeff925c66e0bf95b6c246b88951bb9c61a | |
parent | 717a475abd6d11522cdd86338201134c9fb74eae (diff) | |
parent | 4ee9f333afe549f6aab8dffd97d2488bd1a72b6d (diff) | |
download | rabbitmq-server-133cde4888174bc7d9a294cdf1895ae4ed274708.tar.gz |
merge heads
-rw-r--r-- | docs/rabbitmqctl.1.xml | 31 | ||||
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 45 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 346 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 78 | ||||
-rw-r--r-- | src/rabbit_control.erl | 21 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 95 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 15 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 109 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 12 | ||||
-rw-r--r-- | src/rabbit_plugin_activator.erl | 20 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 41 | ||||
-rw-r--r-- | src/rabbit_router.erl | 58 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_types.erl | 19 |
22 files changed, 538 insertions, 421 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 73882861..3b7244c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -894,16 +894,31 @@ </para> <variablelist> <varlistentry> - <term>exchange_name</term> - <listitem><para>The name of the exchange to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>source_name</term> + <listitem><para>The name of the source of messages to + which the binding is attached. With non-ASCII + characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> - <term>queue_name</term> - <listitem><para>The name of the queue to which the - binding is attached. with non-ASCII characters - escaped as in C.</para></listitem> + <term>source_kind</term> + <listitem><para>The kind of the source of messages to + which the binding is attached. Currently always + queue. With non-ASCII characters escaped as in + C.</para></listitem> + </varlistentry> + <varlistentry> + <term>destination_name</term> + <listitem><para>The name of the destination of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> + </varlistentry> + <varlistentry> + <term>destination_kind</term> + <listitem><para>The kind of the destination of + messages to which the binding is attached. With + non-ASCII characters escaped as in + C.</para></listitem> </varlistentry> <varlistentry> <term>routing_key</term> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 73a8ad97..af6e257a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -60,8 +60,8 @@ -record(route, {binding, value = const}). -record(reverse_route, {reverse_binding, value = const}). --record(binding, {exchange_name, key, queue_name, args = []}). --record(reverse_binding, {queue_name, key, exchange_name, args = []}). +-record(binding, {source, key, destination, args = []}). +-record(reverse_binding, {destination, key, source, args = []}). -record(listener, {node, protocol, host, port}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index cecd666b..ae326a87 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -31,8 +31,8 @@ -ifdef(use_specs). -spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24320f51..2389ec86 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). +-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -115,9 +115,7 @@ (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(delete_exclusive/1 :: (rabbit_types:amqqueue()) - -> rabbit_types:ok_or_error2(qlen(), - 'not_exclusive')). +-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -254,10 +252,10 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = []}). + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -362,8 +360,8 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). -delete_exclusive(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, delete_exclusive, infinity). +delete_immediately(#amqqueue{ pid = QPid }) -> + gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -439,7 +437,7 @@ internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_durable_queue, QueueName}), %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_queue(QueueName). + rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> case rabbit_misc:execute_mnesia_transaction( @@ -450,8 +448,7 @@ internal_delete(QueueName) -> end end) of {error, _} = Err -> Err; - PostHook -> PostHook(), - ok + Deletions -> ok = rabbit_binding:process_deletions(Deletions) end. maybe_run_queue_via_backing_queue(QPid, Fun) -> @@ -470,19 +467,20 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - [Hook() || - Hook <- rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)], - ok. + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end))). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), - rabbit_binding:remove_transient_for_queue(QueueName). + rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, @@ -508,4 +506,3 @@ delegate_call(Pid, Msg, Timeout) -> delegate_cast(Pid, Msg) -> delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 61204deb..19db731a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -594,7 +594,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -602,6 +601,7 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of update_ram_duration -> 8; + delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; @@ -787,16 +787,6 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, ensure_expiry_timer(State)); -handle_call(delete_exclusive, _From, - State = #q{ backing_queue_state = BQS, - backing_queue = BQ, - q = #amqqueue{exclusive_owner = Owner} - }) when Owner =/= none -> - {stop, normal, {ok, BQ:len(BQS)}, State}; - -handle_call(delete_exclusive, _From, State) -> - reply({error, not_exclusive}, State); - handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), @@ -868,6 +858,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); +handle_cast(delete_immediately, State) -> + {stop, normal, State}; + handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 19150fa9..1af213c4 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,29 +33,35 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2]). +-export([new_deletions/0, combine_deletions/2, add_deletion/3, + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx --export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). +-export([has_for_source/1, remove_for_source/1, + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([key/0]). +-export_type([key/0, deletions/0]). -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('source_not_found' | + 'destination_not_found' | + 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> + fun((rabbit_types:exchange(), + rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-opaque(deletions() :: dict:dictionary()). + -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). @@ -65,10 +71,13 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_source/1 :: + (rabbit_types:binding_source()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_types:binding_destination()) -> bindings()). +-spec(list_for_source_and_destination/2 :: + (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> + bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -76,18 +85,27 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). --spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). +-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). +-spec(remove_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_transient_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). +-spec(add_deletion/3 :: (rabbit_exchange:name(), + {'undefined' | rabbit_types:binding_source(), + 'deleted' | 'not_deleted', + deletions()}, deletions()) -> deletions()). +-spec(new_deletions/0 :: () -> deletions()). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,36 +119,34 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, all_durable([Src, Dst]), fun mnesia:write/3), - {new, X, B}; - [_] -> {existing, X, B} + {new, Src, B}; + [_] -> {existing, Src, B} end; {error, _} = E -> E end end) of - {new, X = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(X, B), + {new, Src = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Src, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -141,61 +157,55 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of - ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, - fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(X), - {{Deleted, X}, B}; - {error, _} = E -> - E - end + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding( + B, all_durable([Src, Dst]), + fun mnesia:delete_object/3), + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> ok = Module:delete(X, [B]); - not_deleted -> ok = Module:remove_bindings(X, [B]) - end, - rabbit_event:notify(binding_deleted, info(B)), - ok + {ok, Deletions} -> + ok = process_deletions(Deletions) end. list(VHostPath) -> - Route = #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), - _ = '_'}, + VHostResource = rabbit_misc:r(VHostPath, '_'), + Route = #route{binding = #binding{source = VHostResource, + destination = VHostResource, + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(XName) -> - Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +list_for_source(SrcName) -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DstName) -> + Route = #route{binding = #binding{destination = DstName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(XName, QueueName) -> - Route = #route{binding = #binding{exchange_name = XName, - queue_name = QueueName, - _ = '_'}}, +list_for_source_and_destination(SrcName, DstName) -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +218,12 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; +i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; +i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; +i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -222,14 +234,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(XName) -> - Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +has_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(XName) -> +remove_for_source(SrcName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -237,26 +249,31 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = XName, - _ = '_'}}, + #route{binding = #binding{source = SrcName, + _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- -binding_action(Binding = #binding{exchange_name = XName, - queue_name = QueueName, - args = Arguments}, Fun) -> - call_with_exchange_and_queue( - XName, QueueName, - fun (X, Q) -> +all_durable(Resources) -> + lists:all(fun (#exchange{durable = D}) -> D; + (#amqqueue{durable = D}) -> D + end, Resources). + +binding_action(Binding = #binding{source = SrcName, + destination = DstName, + args = Arguments}, Fun) -> + call_with_source_and_destination( + SrcName, DstName, + fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,17 +287,22 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(XName, QueueName, Fun) -> +call_with_source_and_destination(SrcName, DstName, Fun) -> + SrcTable = table_for_resource(SrcName), + DstTable = table_for_resource(DstName), rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({rabbit_queue, QueueName})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + fun () -> case {mnesia:read({SrcTable, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> {error, source_not_found}; + {[_], [] } -> {error, destination_not_found}; + {[], [] } -> {error, source_and_destination_not_found} + end end). +table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; +table_for_resource(#resource{kind = queue}) -> rabbit_queue. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), @@ -293,8 +315,8 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> - DeletedBindings = +remove_for_destination(DstName, FwdDeleteFun) -> + Bindings = [begin Route = reverse_route(ReverseRoute), ok = FwdDeleteFun(Route), @@ -304,40 +326,41 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DstName, + _ = '_'}}), write)], - Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Grouped) - end. + group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), + lists:keysort(#binding.source, Bindings)). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_and_auto_delete([], Acc) -> +group_bindings_fold(_Fun, Acc, []) -> Acc; -group_bindings_and_auto_delete( - [B = #binding{exchange_name = XName} | Bs], Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B], Acc). - -group_bindings_and_auto_delete( - XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching XName - [X] = mnesia:read({rabbit_exchange, XName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], - group_bindings_and_auto_delete(Removed, NewAcc). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). + +group_bindings_fold( + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + %% Either Removed is [], or its head has a non-matching SrcName. + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + +maybe_auto_delete(XName, Bindings, Deletions) -> + case rabbit_exchange:lookup(XName) of + {error, not_found} -> + add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); + {ok, X} -> + add_deletion(XName, {X, not_deleted, Bindings}, + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions; + {deleted, Deletions1} -> combine_deletions( + Deletions, Deletions1) + end) + end. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -358,20 +381,59 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}. +reverse_binding(#reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}; + +reverse_binding(#binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}. + +%% ---------------------------------------------------------------------------- +%% Binding / exchange deletion abstraction API +%% ---------------------------------------------------------------------------- + +anything_but( NotThis, NotThis, NotThis) -> NotThis; +anything_but( NotThis, NotThis, This) -> This; +anything_but( NotThis, This, NotThis) -> This; +anything_but(_NotThis, This, This) -> This. + +new_deletions() -> dict:new(). + +add_deletion(XName, Entry, Deletions) -> + dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, + Entry, Deletions). + +combine_deletions(Deletions1, Deletions2) -> + dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, + Deletions1, Deletions2). + +merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), + anything_but(not_deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]}. + +process_deletions(Deletions) -> + dict:fold( + fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify(binding_deleted, info(B)) || + B <- FlatBindings], + TypeModule = type_to_module(Type), + case Deleted of + not_deleted -> TypeModule:remove_bindings(X, FlatBindings); + deleted -> rabbit_event:notify(exchange_deleted, + [{name, X#exchange.name}]), + TypeModule:delete(X, FlatBindings) + end + end, ok, Deletions). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe36cef9..d11b0684 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -728,6 +728,24 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.bind_ok'{}, NoWait, State); + +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.unbind_ok'{}, NoWait, State); + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, @@ -819,7 +837,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, nowait = NoWait, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:add/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, @@ -827,7 +845,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> binding_action(fun rabbit_binding:remove/2, - ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, @@ -877,14 +895,14 @@ handle_method(#'channel.flow'{active = false}, _, undefined -> start_limiter(State); Other -> Other end, + State1 = State#ch{limiter_pid = LimiterPid1}, ok = rabbit_limiter:block(LimiterPid1), - QPids = consumer_queues(Consumers), - Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], - ok = rabbit_amqqueue:flush_all(QPids, self()), - case Queues of - [] -> {reply, #'channel.flow_ok'{active = false}, State}; - _ -> {noreply, State#ch{limiter_pid = LimiterPid1, - blocking = dict:from_list(Queues)}} + case consumer_queues(Consumers) of + [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; handle_method(_MethodRecord, _Content, _State) -> @@ -893,42 +911,48 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, +binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, + RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! - QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_write_permitted(QueueName, State), - ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, - State), + DestinationName = + case DestinationType of + queue -> expand_queue_name_shortcut(DestinationNameBin, State); + exchange -> rabbit_misc:r(VHostPath, exchange, DestinationNameBin) + end, + check_write_permitted(DestinationName, State), + ActualRoutingKey = + expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(#binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = ActualRoutingKey, - args = Arguments}, - fun (_X, Q) -> + case Fun(#binding{source = ExchangeName, + destination = DestinationName, + key = ActualRoutingKey, + args = Arguments}, + fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} - end + end; + (_X, #exchange{}) -> + ok end) of - {error, exchange_not_found} -> + {error, source_not_found} -> rabbit_misc:not_found(ExchangeName); - {error, queue_not_found} -> - rabbit_misc:not_found(QueueName); - {error, exchange_and_queue_not_found} -> + {error, destination_not_found} -> + rabbit_misc:not_found(DestinationName); + {error, source_and_destination_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(QueueName)]); + rabbit_misc:rs(DestinationName)]); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 57efe7cc..8facaf16 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -94,29 +94,29 @@ start() -> end, halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - error("invalid command '~s'", - [lists:flatten( - rabbit_misc:intersperse( - " ", [atom_to_list(Command) | Args]))]), + print_error("invalid command '~s'", + [lists:flatten( + rabbit_misc:intersperse( + " ", [atom_to_list(Command) | Args]))]), usage(); {error, Reason} -> - error("~p", [Reason]), + print_error("~p", [Reason]), halt(2); {badrpc, {'EXIT', Reason}} -> - error("~p", [Reason]), + print_error("~p", [Reason]), halt(2); {badrpc, Reason} -> - error("unable to connect to node ~w: ~w", [Node, Reason]), + print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), halt(2); Other -> - error("~p", [Other]), + print_error("~p", [Other]), halt(2) end. fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). -error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). +print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> fmt_stderr("diagnostics:", []), @@ -257,7 +257,8 @@ action(list_exchanges, Node, Args, Opts, Inform) -> action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, routing_key, arguments]), display_info_list(rpc_call(Node, rabbit_binding, info_all, [VHostArg, ArgAtoms]), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2a19d5b1..46564233 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -82,8 +82,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> - 'not_deleted' | 'auto_deleted'). +-spec(maybe_auto_delete/1:: + (rabbit_types:exchange()) + -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -endif. @@ -99,11 +100,11 @@ recover() -> end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( - lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#binding.source, Bs), lists:keysort(#exchange.name, Xs), []). -recover_with_bindings([B = #binding{exchange_name = Name} | Rest], - Xs = [#exchange{name = Name} | _], +recover_with_bindings([B = #binding{source = XName} | Rest], + Xs = [#exchange{name = XName} | _], Bindings) -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> @@ -225,38 +226,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + rabbit_router:deliver( + route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}), + Delivery). + +route(Delivery, {WorkList, SeenXs, QNames}) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(QNames); + {{value, X = #exchange{type = Type}}, WorkList1} -> + DstNames = process_alternate( + X, ((type_to_module(Type)):route(X, Delivery))), + route(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. +process_alternate(#exchange{name = XName, arguments = Args}, []) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> []; + AName -> [AName] + end; +process_alternate(_X, Results) -> + Results. + +process_route(#resource{kind = exchange} = XName, + {WorkList, SeenXs, QNames} = Acc) -> + case sets:is_element(XName, SeenXs) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X} -> queue:in(X, WorkList); + {error, not_found} -> WorkList + end, sets:add_element(XName, SeenXs), QNames} + end; +process_route(#resource{kind = queue} = QName, + {WorkList, SeenXs, QNames}) -> + {WorkList, SeenXs, [QName | QNames]}. + call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of @@ -271,9 +278,10 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X = #exchange{type = Type}, Bs} -> - (type_to_module(Type)):delete(X, Bs), - ok; + {deleted, X, Bs, Deletions} -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); Error = {error, _InUseOrNotFound} -> Error end. @@ -282,19 +290,18 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, []} -> auto_deleted + {error, in_use} -> not_deleted; + {deleted, X, [], Deletions} -> {deleted, Deletions} end. conditional_delete(X = #exchange{name = XName}) -> - case rabbit_binding:has_for_exchange(XName) of + case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); true -> {error, in_use} end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_exchange(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), - rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings}. + Bindings = rabbit_binding:remove_for_source(XName), + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 85760edc..742944dc 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -36,7 +36,7 @@ behaviour_info(callbacks) -> [ {description, 0}, - {publish, 2}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {validate, 1}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 4f6eb851..d934a497 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,10 +50,9 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), - Delivery). +route(#exchange{name = Name}, + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 94798c78..77ca9686 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,8 +50,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). +route(#exchange{name = Name}, _Delivery) -> + rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0a59a175..ec9e7ba4 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -56,17 +56,14 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> +route(#exchange{name = Name}, + #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_field_table(H) end, - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). + rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). default_headers_match_kind() -> all. @@ -79,7 +76,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that publish/1 and +%% (rabbit_misc:sort_field_table) that route/1 and %% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e796acf3..d3ecdd4d 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -34,7 +34,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, publish/2]). +-export([description/0, route/2]). -export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -58,13 +58,12 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -publish(#exchange{name = Name}, Delivery = +route(#exchange{name = Name}, #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings( - Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). + rabbit_router:match_bindings(Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d35adf16..577d206d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -218,13 +218,15 @@ table_definitions() -> {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. binding_match() -> - #binding{queue_name = queue_name_match(), - exchange_name = exchange_name_match(), + #binding{source = exchange_name_match(), + destination = binding_destination_match(), _='_'}. reverse_binding_match() -> - #reverse_binding{queue_name = queue_name_match(), - exchange_name = exchange_name_match(), + #reverse_binding{destination = binding_destination_match(), + source = exchange_name_match(), _='_'}. +binding_destination_match() -> + resource_match('_'). exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 81d3c501..66cc06cf 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,9 +33,9 @@ -behaviour(gen_server2). --export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/2, - client_delete_and_terminate/3, successfully_recovered_state/1]). +-export([start_link/4, successfully_recovered_state/1, + client_init/2, client_terminate/2, client_delete_and_terminate/3, + write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -123,6 +123,11 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). +-spec(successfully_recovered_state/1 :: (server()) -> boolean()). +-spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). +-spec(client_delete_and_terminate/3 :: + (client_msstate(), server(), binary()) -> 'ok'). -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> @@ -131,15 +136,11 @@ -spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). + +-spec(sync/1 :: (server()) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(successfully_recovered_state/1 :: (server()) -> boolean()). - -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), {ets:tid(), file:filename(), atom(), any()}) -> 'concurrent_readers' | non_neg_integer()). @@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [Server, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). +successfully_recovered_state(Server) -> + gen_server2:call(Server, successfully_recovered_state, infinity). + +client_init(Server, Ref) -> + {IState, IModule, Dir, GCPid, + FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + gen_server2:call(Server, {new_client_state, Ref}, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }. + +client_terminate(CState, Server) -> + close_all_handles(CState), + ok = gen_server2:call(Server, client_terminate, infinity). + +client_delete_and_terminate(CState, Server, Ref) -> + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). + write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), @@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:cast(Server, sync). %% internal + +sync(Server) -> + gen_server2:cast(Server, sync). gc_done(Server, Reclaimed, Source, Destination) -> gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). @@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). -client_init(Server, Ref) -> - {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), - index_state = IState, - index_module = IModule, - dir = Dir, - gc_pid = GCPid, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. - -client_terminate(CState, Server) -> - close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). - -client_delete_and_terminate(CState, Server, Ref) -> - close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). - -successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). - %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - {new_client_state, _Ref} -> 7; successfully_recovered_state -> 7; + {new_client_state, _Ref} -> 7; {read, _Guid} -> 2; _ -> 0 end. @@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -handle_call({read, Guid}, From, State) -> - State1 = read_message(Guid, From, State), - noreply(State1); - -handle_call({contains, Guid}, From, State) -> - State1 = contains_message(Guid, From, State), - noreply(State1); +handle_call(successfully_recovered_state, _From, State) -> + reply(State #msstate.successfully_recovered, State); handle_call({new_client_state, CRef}, _From, State = #msstate { dir = Dir, @@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); -handle_call(successfully_recovered_state, _From, State) -> - reply(State #msstate.successfully_recovered, State); - handle_call(client_terminate, _From, State) -> - reply(ok, State). + reply(ok, State); + +handle_call({read, Guid}, From, State) -> + State1 = read_message(Guid, From, State), + noreply(State1); + +handle_call({contains, Guid}, From, State) -> + State1 = contains_message(Guid, From, State), + noreply(State1). + +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); handle_cast({write, Guid}, State = #msstate { sum_valid_data = SumValid, @@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - -handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + noreply(State). handle_info(timeout, State) -> noreply(internal_sync(State)); diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 5cfd6a5c..b48d0aa3 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -64,20 +64,20 @@ start() -> io:format("done.~n"), halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - error("invalid command '~s'", - [lists:flatten( - rabbit_misc:intersperse(" ", FullCommand))]), + print_error("invalid command '~s'", + [lists:flatten( + rabbit_misc:intersperse(" ", FullCommand))]), usage(); timeout -> - error("timeout starting some nodes.", []), + print_error("timeout starting some nodes.", []), halt(1); Other -> - error("~p", [Other]), + print_error("~p", [Other]), halt(2) end end. -error(Format, Args) -> +print_error(Format, Args) -> rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). parse_args([Command | Args]) -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index c38ef8d2..88300ab4 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -72,7 +72,8 @@ start() -> %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of {failed_to_load_app, App, Err} -> - error("failed to load application ~s:~n~p", [App, Err]); + terminate("failed to load application ~s:~n~p", + [App, Err]); AppList -> AppList end, @@ -116,19 +117,20 @@ start() -> end, ok; {error, Module, Error} -> - error("generation of boot script file ~s failed:~n~s", - [ScriptFile, Module:format_error(Error)]) + terminate("generation of boot script file ~s failed:~n~s", + [ScriptFile, Module:format_error(Error)]) end, case post_process_script(ScriptFile) of ok -> ok; {error, Reason} -> - error("post processing of boot script file ~s failed:~n~w", - [ScriptFile, Reason]) + terminate("post processing of boot script file ~s failed:~n~w", + [ScriptFile, Reason]) end, case systools:script2boot(RootName) of ok -> ok; - error -> error("failed to compile boot script file ~s", [ScriptFile]) + error -> terminate("failed to compile boot script file ~s", + [ScriptFile]) end, io:format("~w plugins activated:~n", [length(PluginApps)]), [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) @@ -190,11 +192,11 @@ unpack_ez_plugins(SrcDir, DestDir) -> %% Eliminate the contents of the destination directory case delete_recursively(DestDir) of ok -> ok; - {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E]) + {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E]) end, case filelib:ensure_dir(DestDir ++ "/") of ok -> ok; - {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2]) + {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2]) end, [unpack_ez_plugin(PluginName, DestDir) || PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")]. @@ -261,6 +263,6 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> process_entry(Entry) -> [Entry]. -error(Fmt, Args) -> +terminate(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), halt(1). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 0b8efc8f..6ac402c8 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues}). +-record(state, {queues, delete_from}). -include("rabbit.hrl"). @@ -66,32 +66,39 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new()}}. + {ok, #state{queues = dict:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- handle_call({register, Q}, _From, - State = #state{queues = Queues}) -> + State = #state{queues = Queues, delete_from = Deleting}) -> MonitorRef = erlang:monitor(process, Q#amqqueue.pid), - {reply, ok, - State#state{queues = dict:store(MonitorRef, Q, Queues)}}; - -handle_call(delete_all, _From, State = #state{queues = Queues}) -> - [rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> - erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete_exclusive(Q) - end) - || {MonitorRef, Q} <- dict:to_list(Queues)], - {reply, ok, State}. + case Deleting of + undefined -> ok; + _ -> rabbit_amqqueue:delete_immediately(Q) + end, + {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, From, State = #state{queues = Queues, + delete_from = undefined}) -> + case dict:size(Queues) of + 0 -> {reply, ok, State#state{delete_from = From}}; + _ -> [rabbit_amqqueue:delete_immediately(Q) + || {_MRef, Q} <- dict:to_list(Queues)], + {noreply, State#state{delete_from = From}} + end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues}) -> - {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. + State = #state{queues = Queues, delete_from = Deleting}) -> + Queues1 = dict:erase(MonitorRef, Queues), + case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of + true -> gen_server:reply(Deleting, ok); + false -> ok + end, + {noreply, State#state{queues = Queues1}}. terminate(_Reason, _State) -> ok. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 39eac072..00df1ce1 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -39,26 +39,27 @@ -ifdef(use_specs). --export_type([routing_key/0, routing_result/0]). +-export_type([routing_key/0, routing_result/0, match_result/0]). -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(qpids() :: [pid()]). +-type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). --spec(match_bindings/2 :: (rabbit_exchange:name(), +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> - qpids()). --spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> - qpids()). + match_result()). +-spec(match_routing_key/2 :: (rabbit_types:binding_source(), + routing_key() | '_') -> match_result()). -endif. %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -66,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QNames, Delivery) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,22 +85,23 @@ deliver(QPids, Delivery) -> {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = XName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - XName == Name, - Match(Binding)]), - lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). +%% TODO: This causes a full scan for each entry with the same source +match_bindings(SrcName, Match) -> + Query = qlc:q([DestinationName || + #route{binding = Binding = #binding{ + source = SrcName1, + destination = DestinationName}} <- + mnesia:table(rabbit_route), + SrcName == SrcName1, + Match(Binding)]), + mnesia:async_dirty(fun qlc:e/1, [Query]). + +match_routing_key(SrcName, RoutingKey) -> + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = RoutingKey, + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). %%-------------------------------------------------------------------- @@ -115,4 +119,4 @@ lookup_qpids(QNames) -> [#amqqueue{pid = QPid}] -> [QPid | QPids]; [] -> QPids end - end, [], lists:usort(QNames)). + end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b36ee0be..1b47cdb7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1040,11 +1040,11 @@ test_server_status() -> %% list bindings ok = info_action(list_bindings, rabbit_binding:info_keys(), true), %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_exchange( + [_|_] = rabbit_binding:list_for_source( rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_queue( + [_] = rabbit_binding:list_for_destination( rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_exchange_and_queue( + [_] = rabbit_binding:list_for_source_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 0b6a15ec..b971a63f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,9 +39,11 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, - ok_pid_or_error/0, channel_exit/0, connection_exit/0]). + binding/0, binding_source/0, binding_destination/0, + amqqueue/0, exchange/0, + connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, + ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -113,11 +115,14 @@ host :: rabbit_networking:hostname(), port :: rabbit_networking:ip_port()}). +-type(binding_source() :: rabbit_exchange:name()). +-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()). + -type(binding() :: - #binding{exchange_name :: rabbit_exchange:name(), - queue_name :: rabbit_amqqueue:name(), - key :: rabbit_binding:key(), - args :: rabbit_framing:amqp_table()}). + #binding{source :: rabbit_exchange:name(), + destination :: binding_destination(), + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), |