diff options
author | Marek Majkowski <marek@rabbitmq.com> | 2011-01-05 14:01:05 +0000 |
---|---|---|
committer | Marek Majkowski <marek@rabbitmq.com> | 2011-01-05 14:01:05 +0000 |
commit | 8a8f97eab200246dd65e9c64971883fc5587ad50 (patch) | |
tree | a46ccc6ffcac1baa087e5156a4278ab8b5f4d5f1 | |
parent | 32313f3fedd4fd072a269932f1ba37b3fcfb28ab (diff) | |
parent | 02396f8d21a641b4640657e1203b7c2a343a1c73 (diff) | |
download | rabbitmq-server-bug20570.tar.gz |
default merged into bug20570bug20570
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 6 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/delegate.erl | 209 | ||||
-rw-r--r-- | src/delegate_sup.erl | 13 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 29 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 18 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 24 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 3 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 35 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
18 files changed, 206 insertions, 185 deletions
@@ -170,7 +170,7 @@ start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server ; sleep 1 + ./scripts/rabbitmq-server; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 35de1cea..01ddd4c1 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -866,6 +866,10 @@ <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem> </varlistentry> <varlistentry> + <term>internal</term> + <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem> + </varlistentry> + <varlistentry> <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> @@ -1077,7 +1081,7 @@ <para role="example-prefix"> For example: </para> - <screen role="example">rabbitmqctl list_connections send_pend server_port</screen> + <screen role="example">rabbitmqctl list_connections send_pend port</screen> <para role="example"> This command displays the send queue size and server port for each connection. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 3888f198..25d630c0 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -31,4 +31,5 @@ {cluster_nodes, []}, {server_properties, []}, {collect_statistics, none}, - {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}. + {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, + {delegate_count, 16}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index fccfad97..8c8e12a1 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, auto_delete, internal, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b..10054e57 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -31,11 +31,9 @@ -module(delegate). --define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). - -behaviour(gen_server2). --export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,13 +42,16 @@ -ifdef(use_specs). --spec(start_link/2 :: - (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). +-spec(invoke/2 :: + ( pid(), fun ((pid()) -> A)) -> A; + ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], + [{pid(), term()}]}). --spec(process_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/0 :: () -> non_neg_integer()). -endif. @@ -61,157 +62,113 @@ %%---------------------------------------------------------------------------- -start_link(Prefix, Hash) -> - gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). +start_link(Num) -> + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). +invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + Fun(Pid); invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), - case Res of - {ok, Result, _} -> + case invoke([Pid], Fun) of + {[{Pid, Result}], []} -> Result; - {error, {Class, Reason, StackTrace}, _} -> + {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; invoke(Pids, Fun) when is_list(Pids) -> - lists:foldl( - fun ({Status, Result, Pid}, {Good, Bad}) -> - case Status of - ok -> {[{Pid, Result}|Good], Bad}; - error -> {Good, [{Pid, Result}|Bad]} - end + {LocalPids, Grouped} = group_pids_by_node(Pids), + %% The use of multi_call is only safe because the timeout is + %% infinity, and thus there is no process spawned in order to do + %% the sending. Thus calls can't overtake preceding calls/casts. + {Replies, BadNodes} = + case orddict:fetch_keys(Grouped) of + [] -> {[], []}; + RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), + {invoke, Fun, Grouped}, + infinity) end, - {[], []}, - invoke_per_node(split_delegate_per_node(Pids), Fun)). + BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || + BadNode <- BadNodes, + Pid <- orddict:fetch(BadNode, Grouped)], + ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + [Results || {_Node, Results} <- Replies]]), + lists:foldl( + fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; + ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} + end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), +invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, Fun), %% we don't care about any error ok; +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result([Pid], Fun); invoke_no_result(Pids, Fun) when is_list(Pids) -> - invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + {LocalPids, Grouped} = group_pids_by_node(Pids), + case orddict:fetch_keys(Grouped) of + [] -> ok; + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + {invoke, Fun, Grouped}) + end, + safe_invoke(LocalPids, Fun), %% must not die ok. %%---------------------------------------------------------------------------- -internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). - -internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). - -split_delegate_per_node(Pids) -> +group_pids_by_node(Pids) -> LocalNode = node(), - {Local, Remote} = - lists:foldl( - fun (Pid, {L, D}) -> - Node = node(Pid), - case Node of - LocalNode -> {[Pid|L], D}; - _ -> {L, orddict:append(Node, Pid, D)} - end - end, - {[], orddict:new()}, Pids), - {Local, orddict:to_list(Remote)}. - -invoke_per_node(NodePids, Fun) -> - lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). - -invoke_no_result_per_node(NodePids, Fun) -> - delegate_per_node(NodePids, Fun, fun internal_cast/2), - ok. - -delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> - %% In the case where DelegateFun is internal_cast, the safe_invoke - %% is not actually async! However, in practice Fun will always be - %% something that does a gen_server:cast or similar, so I don't - %% think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - [safe_invoke(LocalPids, Fun)| - delegate_per_remote_node(NodePids, Fun, DelegateFun)]. - -delegate_per_remote_node(NodePids, Fun, DelegateFun) -> - Self = self(), - %% Note that this is unsafe if the Fun requires reentrancy to the - %% local_server. I.e. if self() == local_server(Node) then we'll - %% block forever. - [gen_server2:cast( - local_server(Node), - {thunk, fun () -> - Self ! {result, - DelegateFun( - Node, fun () -> safe_invoke(Pids, Fun) end)} - end}) || {Node, Pids} <- NodePids], - [receive {result, Result} -> Result end || _ <- NodePids]. - -local_server(Node) -> - case get({delegate_local_server_name, Node}) of - undefined -> - Name = server(outgoing, - erlang:phash2({self(), Node}, process_count())), - put({delegate_local_server_name, Node}, Name), - Name; - Name -> Name - end. - -remote_server(Node) -> - case get({delegate_remote_server_name, Node}) of - undefined -> - case rpc:call(Node, delegate, process_count, []) of - {badrpc, _} -> - %% Have to return something, if we're just casting - %% then we don't want to blow up - server(incoming, 1); - Count -> - Name = server(incoming, - erlang:phash2({self(), Node}, Count)), - put({delegate_remote_server_name, Node}, Name), - Name - end; - Name -> Name + lists:foldl( + fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> + {[Pid | Local], Remote}; + (Pid, {Local, Remote}) -> + {Local, + orddict:update( + node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} + end, {[], orddict:new()}, Pids). + +delegate_count() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + Count. + +delegate_name(Hash) -> + list_to_atom("delegate_" ++ integer_to_list(Hash)). + +delegate() -> + case get(delegate) of + undefined -> Name = delegate_name( + erlang:phash2(self(), delegate_count())), + put(delegate, Name), + Name; + Name -> Name end. -server(Prefix, Hash) -> - list_to_atom("delegate_" ++ - atom_to_list(Prefix) ++ "_" ++ - integer_to_list(Hash)). - safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; safe_invoke(Pid, Fun) when is_pid(Pid) -> try - {ok, Fun(Pid), Pid} - catch - Class:Reason -> - {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + {ok, Pid, Fun(Pid)} + catch Class:Reason -> + {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. -process_count() -> - ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). - -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, no_state, hibernate, + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -%% We don't need a catch here; we always go via safe_invoke. A catch here would -%% be the wrong thing anyway since the Thunk can throw multiple errors. -handle_call({thunk, Thunk}, _From, State) -> - {reply, Thunk(), State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_cast({thunk, Thunk}, State) -> - Thunk(), - {noreply, State, hibernate}. +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 544546f1..d2af72af 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,11 +55,8 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. - -specs(Prefix) -> - [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]. - -%%---------------------------------------------------------------------------- + DCount = delegate:delegate_count(), + {ok, {{one_for_one, 10, 10}, + [{Num, {delegate, start_link, [Num]}, + transient, 16#ffffffff, worker, [delegate]} || + Num <- lists:seq(0, DCount - 1)]}}. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 2bc946db..51adbac8 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -290,7 +290,7 @@ add_vhost(VHostPath) -> write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, false, false, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 71fd7a17..35ed1c94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -503,19 +503,17 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(F, Pids) -> - {_, Bad} = delegate:invoke(Pids, - fun (Pid) -> + case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( fun () -> ok end, fun () -> F(Pid) end) - end), - case Bad of - [] -> ok; - _ -> {error, Bad} + end) of + {_, []} -> ok; + {_, Bad} -> {error, Bad} end. delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_cast(Pid, Msg) -> - delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). + delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 523f7c5e..981dd31d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -200,7 +200,7 @@ terminate_shutdown(Fun, State) -> BQSN1 end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) - || {CTag, Ch, _} <- consumers(State1)], + || {Ch, CTag, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4a4636ea..73ecd1b4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -409,6 +409,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{username = Actual}) -> precondition_failed, "user_id property set to '~s' but " "authenticated user was '~s'", [Claimed, Actual]). +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -549,6 +556,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), @@ -782,7 +790,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, - internal = false, + internal = Internal, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> @@ -805,10 +813,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, CheckedType, Durable, AutoDelete, + Internal, Args) end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Args), + AutoDelete, Internal, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 42861f86..dd009c83 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -49,7 +49,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, false, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7414c904..a95cf0b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,11 +33,11 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, +-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). +-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -49,13 +49,14 @@ -type(type() :: atom()). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: - (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) +-spec(declare/6 :: + (name(), type(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table()) -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom() | rabbit_types:connection_exit()). --spec(assert_equivalence/5 :: - (rabbit_types:exchange(), atom(), boolean(), boolean(), +-spec(assert_equivalence/6 :: + (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit()). -spec(assert_args_equivalence/2 :: @@ -90,7 +91,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> Xs = rabbit_misc:table_fold( @@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(XName, Type, Durable, AutoDelete, Args) -> +declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, durable = Durable, auto_delete = AutoDelete, + internal = Internal, arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return @@ -170,14 +172,16 @@ check_type(TypeBin) -> assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, + internal = Internal, type = Type}, - Type, Durable, AutoDelete, RequiredArgs) -> + Type, Durable, AutoDelete, Internal, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, - _Args) -> +assert_equivalence(#exchange{ name = Name }, + _Type, _Durable, _Internal, _AutoDelete, _Args) -> rabbit_misc:protocol_error( precondition_failed, - "cannot redeclare ~s with different type, durable or autodelete value", + "cannot redeclare ~s with different type, durable, " + "internal or autodelete value", [rabbit_misc:rs(Name)]). assert_args_equivalence(#exchange{ name = Name, arguments = Args }, @@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 52d76ac4..06ba319b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -344,8 +344,8 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch - exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + catch exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> Handler() end. @@ -589,19 +589,19 @@ sort_field_table(Arguments) -> pid_to_string(Pid) when is_pid(Pid) -> %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and %% 8.7) - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). %% inverse of above string_to_pid(Str) -> Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. - case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", + case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of - {match, [NodeStr, IdStr, SerStr]} -> + {match, [NodeStr, CreStr, IdStr, SerStr]} -> %% the NodeStr atom might be quoted, so we have to parse %% it rather than doing a simple list_to_atom NodeAtom = case erl_scan:string(NodeStr) of @@ -609,9 +609,9 @@ string_to_pid(Str) -> {error, _, _} -> throw(Err) end, <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), - Id = list_to_integer(IdStr), - Ser = list_to_integer(SerStr), - binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + [Cre, Id, Ser] = lists:map(fun list_to_integer/1, + [CreStr, IdStr, SerStr]), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); nomatch -> throw(Err) end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index dadfc16e..11f5e410 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -34,7 +34,8 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, - is_clustered/0, empty_ram_only_tables/0, copy_db/1]). + is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, + empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -63,6 +64,8 @@ -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(is_clustered/0 :: () -> boolean()). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(all_clustered_nodes/0 :: () -> [node()]). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -81,12 +84,12 @@ status() -> Nodes = nodes_of_type(CopyType), Nodes =/= [] end]; - no -> case mnesia:system_info(db_nodes) of + no -> case all_clustered_nodes() of [] -> []; Nodes -> [{unknown, Nodes}] end end}, - {running_nodes, mnesia:system_info(running_db_nodes)}]. + {running_nodes, running_clustered_nodes()}]. init() -> ok = ensure_mnesia_running(), @@ -127,9 +130,15 @@ reset() -> reset(false). force_reset() -> reset(true). is_clustered() -> - RunningNodes = mnesia:system_info(running_db_nodes), + RunningNodes = running_clustered_nodes(), [node()] /= RunningNodes andalso [] /= RunningNodes. +all_clustered_nodes() -> + mnesia:system_info(db_nodes). + +running_clustered_nodes() -> + mnesia:system_info(running_db_nodes). + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), - mnesia:system_info(db_nodes)} of + case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade ok = wait_for_tables(), @@ -566,8 +574,8 @@ reset(Force) -> {Nodes, RunningNodes} = try ok = init(), - {mnesia:system_info(db_nodes) -- [Node], - mnesia:system_info(running_db_nodes) -- [Node]} + {all_clustered_nodes() -- [Node], + running_clustered_nodes() -- [Node]} after mnesia:stop() end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e8b4e8e2..2e1834c7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -701,7 +701,7 @@ handle_cast({write, CRef, Guid}, {ok, _} -> dict:update(CRef, fun(Guids) -> gb_sets:add(Guid, Guids) end, - gb_sets:empty(), CTG); + gb_sets:singleton(Guid), CTG); error -> CTG end, State1 = State #msstate { cref_to_guids = CTG1 }, diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1c542ffe..d5a9d73c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) -> connections() -> [rabbit_connection_sup:reader(ConnSup) || + Node <- rabbit_mnesia:running_clustered_nodes(), {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children({rabbit_tcp_client_sup, Node})]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index adf968cb..eca748a9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -96,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + %% we now run the tests remotely, so that code coverage on the + %% local node picks up more of the delegate + Node = node(), + Self = self(), + Remote = spawn(SecondaryNode, + fun () -> A = test_delegates_async(Node), + B = test_delegates_sync(Node), + Self ! {self(), {A, B}} + end), + receive + {Remote, Result} -> + Result = {passed, passed} + after 2000 -> + throw(timeout) + end, + passed. test_priority_queue() -> @@ -1247,15 +1263,26 @@ test_delegates_sync(SecondaryNode) -> true = lists:all(fun ({_, response}) -> true end, GoodRes), GoodResPids = [Pid || {Pid, _} <- GoodRes], - Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), - Good = ordsets:from_list(GoodResPids), + Good = lists:usort(LocalGoodPids ++ RemoteGoodPids), + Good = lists:usort(GoodResPids), {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), BadResPids = [Pid || {Pid, _} <- BadRes], - Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), - Bad = ordsets:from_list(BadResPids), + Bad = lists:usort(LocalBadPids ++ RemoteBadPids), + Bad = lists:usort(BadResPids), + + MagicalPids = [rabbit_misc:string_to_pid(Str) || + Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]], + {[], BadNodes} = delegate:invoke(MagicalPids, Sender), + true = lists:all( + fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end, + BadNodes), + BadNodesPids = [Pid || {Pid, _} <- BadNodes], + + Magical = lists:usort(MagicalPids), + Magical = lists:usort(BadNodesPids), passed. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1c56d51d..7848c848 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,7 @@ -rabbit_upgrade({remove_user_scope, []}). -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). +-rabbit_upgrade({internal_exchanges, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(remove_user_scope/0 :: () -> 'ok'). -spec(hash_passwords/0 :: () -> 'ok'). -spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). -endif. @@ -71,6 +73,18 @@ add_ip_to_listener() -> end, [node, protocol, host, ip_address, port]). +internal_exchanges() -> + Tables = [rabbit_exchange, rabbit_durable_exchange], + AddInternalFun = + fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> + {exchange, Name, Type, Durable, AutoDelete, false, Args} + end, + [ ok = mnesia(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) + || T <- Tables ], + ok. + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> |