summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <marek@rabbitmq.com>2011-01-05 14:01:05 +0000
committerMarek Majkowski <marek@rabbitmq.com>2011-01-05 14:01:05 +0000
commit8a8f97eab200246dd65e9c64971883fc5587ad50 (patch)
treea46ccc6ffcac1baa087e5156a4278ab8b5f4d5f1
parent32313f3fedd4fd072a269932f1ba37b3fcfb28ab (diff)
parent02396f8d21a641b4640657e1203b7c2a343a1c73 (diff)
downloadrabbitmq-server-bug20570.tar.gz
default merged into bug20570bug20570
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml6
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/delegate.erl209
-rw-r--r--src/delegate_sup.erl13
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl29
-rw-r--r--src/rabbit_misc.erl18
-rw-r--r--src/rabbit_mnesia.erl24
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_tests.erl35
-rw-r--r--src/rabbit_upgrade_functions.erl14
18 files changed, 206 insertions, 185 deletions
diff --git a/Makefile b/Makefile
index e0d5744c..00bfd629 100644
--- a/Makefile
+++ b/Makefile
@@ -170,7 +170,7 @@ start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
- ./scripts/rabbitmq-server ; sleep 1
+ ./scripts/rabbitmq-server; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 35de1cea..01ddd4c1 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -866,6 +866,10 @@
<listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>internal</term>
+ <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>arguments</term>
<listitem><para>Exchange arguments.</para></listitem>
</varlistentry>
@@ -1077,7 +1081,7 @@
<para role="example-prefix">
For example:
</para>
- <screen role="example">rabbitmqctl list_connections send_pend server_port</screen>
+ <screen role="example">rabbitmqctl list_connections send_pend port</screen>
<para role="example">
This command displays the send queue size and server port for each
connection.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 3888f198..25d630c0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -31,4 +31,5 @@
{cluster_nodes, []},
{server_properties, []},
{collect_statistics, none},
- {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}.
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
+ {delegate_count, 16}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index fccfad97..8c8e12a1 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, auto_delete, internal, arguments}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid}).
diff --git a/src/delegate.erl b/src/delegate.erl
index 11abe73b..10054e57 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -31,11 +31,9 @@
-module(delegate).
--define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
-
-behaviour(gen_server2).
--export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,13 +42,16 @@
-ifdef(use_specs).
--spec(start_link/2 ::
- (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
+-spec(invoke/2 ::
+ ( pid(), fun ((pid()) -> A)) -> A;
+ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
--spec(process_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/0 :: () -> non_neg_integer()).
-endif.
@@ -61,157 +62,113 @@
%%----------------------------------------------------------------------------
-start_link(Prefix, Hash) ->
- gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []).
+start_link(Num) ->
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ Fun(Pid);
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
- case Res of
- {ok, Result, _} ->
+ case invoke([Pid], Fun) of
+ {[{Pid, Result}], []} ->
Result;
- {error, {Class, Reason, StackTrace}, _} ->
+ {[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
invoke(Pids, Fun) when is_list(Pids) ->
- lists:foldl(
- fun ({Status, Result, Pid}, {Good, Bad}) ->
- case Status of
- ok -> {[{Pid, Result}|Good], Bad};
- error -> {Good, [{Pid, Result}|Bad]}
- end
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ %% The use of multi_call is only safe because the timeout is
+ %% infinity, and thus there is no process spawned in order to do
+ %% the sending. Thus calls can't overtake preceding calls/casts.
+ {Replies, BadNodes} =
+ case orddict:fetch_keys(Grouped) of
+ [] -> {[], []};
+ RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped},
+ infinity)
end,
- {[], []},
- invoke_per_node(split_delegate_per_node(Pids), Fun)).
+ BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
+ BadNode <- BadNodes,
+ Pid <- orddict:fetch(BadNode, Grouped)],
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ [Results || {_Node, Results} <- Replies]]),
+ lists:foldl(
+ fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
+ ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
+ end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, Fun), %% we don't care about any error
ok;
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result([Pid], Fun);
invoke_no_result(Pids, Fun) when is_list(Pids) ->
- invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ {LocalPids, Grouped} = group_pids_by_node(Pids),
+ case orddict:fetch_keys(Grouped) of
+ [] -> ok;
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ {invoke, Fun, Grouped})
+ end,
+ safe_invoke(LocalPids, Fun), %% must not die
ok.
%%----------------------------------------------------------------------------
-internal_call(Node, Thunk) when is_atom(Node) ->
- gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
-
-internal_cast(Node, Thunk) when is_atom(Node) ->
- gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
-
-split_delegate_per_node(Pids) ->
+group_pids_by_node(Pids) ->
LocalNode = node(),
- {Local, Remote} =
- lists:foldl(
- fun (Pid, {L, D}) ->
- Node = node(Pid),
- case Node of
- LocalNode -> {[Pid|L], D};
- _ -> {L, orddict:append(Node, Pid, D)}
- end
- end,
- {[], orddict:new()}, Pids),
- {Local, orddict:to_list(Remote)}.
-
-invoke_per_node(NodePids, Fun) ->
- lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-
-invoke_no_result_per_node(NodePids, Fun) ->
- delegate_per_node(NodePids, Fun, fun internal_cast/2),
- ok.
-
-delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
- %% In the case where DelegateFun is internal_cast, the safe_invoke
- %% is not actually async! However, in practice Fun will always be
- %% something that does a gen_server:cast or similar, so I don't
- %% think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- [safe_invoke(LocalPids, Fun)|
- delegate_per_remote_node(NodePids, Fun, DelegateFun)].
-
-delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
- Self = self(),
- %% Note that this is unsafe if the Fun requires reentrancy to the
- %% local_server. I.e. if self() == local_server(Node) then we'll
- %% block forever.
- [gen_server2:cast(
- local_server(Node),
- {thunk, fun () ->
- Self ! {result,
- DelegateFun(
- Node, fun () -> safe_invoke(Pids, Fun) end)}
- end}) || {Node, Pids} <- NodePids],
- [receive {result, Result} -> Result end || _ <- NodePids].
-
-local_server(Node) ->
- case get({delegate_local_server_name, Node}) of
- undefined ->
- Name = server(outgoing,
- erlang:phash2({self(), Node}, process_count())),
- put({delegate_local_server_name, Node}, Name),
- Name;
- Name -> Name
- end.
-
-remote_server(Node) ->
- case get({delegate_remote_server_name, Node}) of
- undefined ->
- case rpc:call(Node, delegate, process_count, []) of
- {badrpc, _} ->
- %% Have to return something, if we're just casting
- %% then we don't want to blow up
- server(incoming, 1);
- Count ->
- Name = server(incoming,
- erlang:phash2({self(), Node}, Count)),
- put({delegate_remote_server_name, Node}, Name),
- Name
- end;
- Name -> Name
+ lists:foldl(
+ fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode ->
+ {[Pid | Local], Remote};
+ (Pid, {Local, Remote}) ->
+ {Local,
+ orddict:update(
+ node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
+ end, {[], orddict:new()}, Pids).
+
+delegate_count() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ Count.
+
+delegate_name(Hash) ->
+ list_to_atom("delegate_" ++ integer_to_list(Hash)).
+
+delegate() ->
+ case get(delegate) of
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(), delegate_count())),
+ put(delegate, Name),
+ Name;
+ Name -> Name
end.
-server(Prefix, Hash) ->
- list_to_atom("delegate_" ++
- atom_to_list(Prefix) ++ "_" ++
- integer_to_list(Hash)).
-
safe_invoke(Pids, Fun) when is_list(Pids) ->
[safe_invoke(Pid, Fun) || Pid <- Pids];
safe_invoke(Pid, Fun) when is_pid(Pid) ->
try
- {ok, Fun(Pid), Pid}
- catch
- Class:Reason ->
- {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ {ok, Pid, Fun(Pid)}
+ catch Class:Reason ->
+ {error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
-process_count() ->
- ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
-
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, no_state, hibernate,
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-%% We don't need a catch here; we always go via safe_invoke. A catch here would
-%% be the wrong thing anyway since the Thunk can throw multiple errors.
-handle_call({thunk, Thunk}, _From, State) ->
- {reply, Thunk(), State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_cast({thunk, Thunk}, State) ->
- Thunk(),
- {noreply, State, hibernate}.
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 544546f1..d2af72af 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -55,11 +55,8 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}.
-
-specs(Prefix) ->
- [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]},
- transient, 16#ffffffff, worker, [delegate]} ||
- Hash <- lists:seq(0, delegate:process_count() - 1)].
-
-%%----------------------------------------------------------------------------
+ DCount = delegate:delegate_count(),
+ {ok, {{one_for_one, 10, 10},
+ [{Num, {delegate, start_link, [Num]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Num <- lists:seq(0, DCount - 1)]}}.
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 2bc946db..51adbac8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -290,7 +290,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, false, false, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 71fd7a17..35ed1c94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -503,19 +503,17 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid}.
safe_delegate_call_ok(F, Pids) ->
- {_, Bad} = delegate:invoke(Pids,
- fun (Pid) ->
+ case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () -> F(Pid) end)
- end),
- case Bad of
- [] -> ok;
- _ -> {error, Bad}
+ end) of
+ {_, []} -> ok;
+ {_, Bad} -> {error, Bad}
end.
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_cast(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
+ delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 523f7c5e..981dd31d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -200,7 +200,7 @@ terminate_shutdown(Fun, State) ->
BQSN1
end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
- || {CTag, Ch, _} <- consumers(State1)],
+ || {Ch, CTag, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4a4636ea..73ecd1b4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -409,6 +409,13 @@ check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{username = Actual}) ->
precondition_failed, "user_id property set to '~s' but "
"authenticated user was '~s'", [Claimed, Actual]).
+check_internal_exchange(#exchange{name = Name, internal = true}) ->
+ rabbit_misc:protocol_error(access_refused,
+ "cannot publish to internal ~s",
+ [rabbit_misc:rs(Name)]);
+check_internal_exchange(_) ->
+ ok.
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -549,6 +556,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
@@ -782,7 +790,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = false,
durable = Durable,
auto_delete = AutoDelete,
- internal = false,
+ internal = Internal,
nowait = NoWait,
arguments = Args},
_, State = #ch{virtual_host = VHostPath}) ->
@@ -805,10 +813,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
CheckedType,
Durable,
AutoDelete,
+ Internal,
Args)
end,
ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Args),
+ AutoDelete, Internal, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 42861f86..dd009c83 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -49,7 +49,7 @@ boot() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, false, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7414c904..a95cf0b1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,11 +33,11 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
+-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
--export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
+-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -49,13 +49,14 @@
-type(type() :: atom()).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 ::
- (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
+-spec(declare/6 ::
+ (name(), type(), boolean(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
-> rabbit_types:exchange()).
-spec(check_type/1 ::
(binary()) -> atom() | rabbit_types:connection_exit()).
--spec(assert_equivalence/5 ::
- (rabbit_types:exchange(), atom(), boolean(), boolean(),
+-spec(assert_equivalence/6 ::
+ (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
-> 'ok' | rabbit_types:connection_exit()).
-spec(assert_args_equivalence/2 ::
@@ -90,7 +91,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
Xs = rabbit_misc:table_fold(
@@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(XName, Type, Durable, AutoDelete, Args) ->
+declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
@@ -170,14 +172,16 @@ check_type(TypeBin) ->
assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
+ internal = Internal,
type = Type},
- Type, Durable, AutoDelete, RequiredArgs) ->
+ Type, Durable, AutoDelete, Internal, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
-assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
- _Args) ->
+assert_equivalence(#exchange{ name = Name },
+ _Type, _Durable, _Internal, _AutoDelete, _Args) ->
rabbit_misc:protocol_error(
precondition_failed,
- "cannot redeclare ~s with different type, durable or autodelete value",
+ "cannot redeclare ~s with different type, durable, "
+ "internal or autodelete value",
[rabbit_misc:rs(Name)]).
assert_args_equivalence(#exchange{ name = Name, arguments = Args },
@@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
+i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 52d76ac4..06ba319b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -344,8 +344,8 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch
- exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown ->
+ catch exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
Handler()
end.
@@ -589,19 +589,19 @@ sort_field_table(Arguments) ->
pid_to_string(Pid) when is_pid(Pid) ->
%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
%% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+ lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
%% inverse of above
string_to_pid(Str) ->
Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
- {match, [NodeStr, IdStr, SerStr]} ->
+ {match, [NodeStr, CreStr, IdStr, SerStr]} ->
%% the NodeStr atom might be quoted, so we have to parse
%% it rather than doing a simple list_to_atom
NodeAtom = case erl_scan:string(NodeStr) of
@@ -609,9 +609,9 @@ string_to_pid(Str) ->
{error, _, _} -> throw(Err)
end,
<<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
- Id = list_to_integer(IdStr),
- Ser = list_to_integer(SerStr),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ [Cre, Id, Ser] = lists:map(fun list_to_integer/1,
+ [CreStr, IdStr, SerStr]),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
nomatch ->
throw(Err)
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index dadfc16e..11f5e410 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
+ is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
+ empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -63,6 +64,8 @@
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(all_clustered_nodes/0 :: () -> [node()]).
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -81,12 +84,12 @@ status() ->
Nodes = nodes_of_type(CopyType),
Nodes =/= []
end];
- no -> case mnesia:system_info(db_nodes) of
+ no -> case all_clustered_nodes() of
[] -> [];
Nodes -> [{unknown, Nodes}]
end
end},
- {running_nodes, mnesia:system_info(running_db_nodes)}].
+ {running_nodes, running_clustered_nodes()}].
init() ->
ok = ensure_mnesia_running(),
@@ -127,9 +130,15 @@ reset() -> reset(false).
force_reset() -> reset(true).
is_clustered() ->
- RunningNodes = mnesia:system_info(running_db_nodes),
+ RunningNodes = running_clustered_nodes(),
[node()] /= RunningNodes andalso [] /= RunningNodes.
+all_clustered_nodes() ->
+ mnesia:system_info(db_nodes).
+
+running_clustered_nodes() ->
+ mnesia:system_info(running_db_nodes).
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -372,8 +381,7 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir),
- mnesia:system_info(db_nodes)} of
+ case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
@@ -566,8 +574,8 @@ reset(Force) ->
{Nodes, RunningNodes} =
try
ok = init(),
- {mnesia:system_info(db_nodes) -- [Node],
- mnesia:system_info(running_db_nodes) -- [Node]}
+ {all_clustered_nodes() -- [Node],
+ running_clustered_nodes() -- [Node]}
after
mnesia:stop()
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e8b4e8e2..2e1834c7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -701,7 +701,7 @@ handle_cast({write, CRef, Guid},
{ok, _} -> dict:update(CRef, fun(Guids) ->
gb_sets:add(Guid, Guids)
end,
- gb_sets:empty(), CTG);
+ gb_sets:singleton(Guid), CTG);
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1c542ffe..d5a9d73c 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) ->
connections() ->
[rabbit_connection_sup:reader(ConnSup) ||
+ Node <- rabbit_mnesia:running_clustered_nodes(),
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children({rabbit_tcp_client_sup, Node})].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index adf968cb..eca748a9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -96,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ %% we now run the tests remotely, so that code coverage on the
+ %% local node picks up more of the delegate
+ Node = node(),
+ Self = self(),
+ Remote = spawn(SecondaryNode,
+ fun () -> A = test_delegates_async(Node),
+ B = test_delegates_sync(Node),
+ Self ! {self(), {A, B}}
+ end),
+ receive
+ {Remote, Result} ->
+ Result = {passed, passed}
+ after 2000 ->
+ throw(timeout)
+ end,
+
passed.
test_priority_queue() ->
@@ -1247,15 +1263,26 @@ test_delegates_sync(SecondaryNode) ->
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
- Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
- Good = ordsets:from_list(GoodResPids),
+ Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
+ Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
- Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
- Bad = ordsets:from_list(BadResPids),
+ Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
+ Bad = lists:usort(BadResPids),
+
+ MagicalPids = [rabbit_misc:string_to_pid(Str) ||
+ Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
+ {[], BadNodes} = delegate:invoke(MagicalPids, Sender),
+ true = lists:all(
+ fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
+ BadNodes),
+ BadNodesPids = [Pid || {Pid, _} <- BadNodes],
+
+ Magical = lists:usort(MagicalPids),
+ Magical = lists:usort(BadNodesPids),
passed.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1c56d51d..7848c848 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -27,6 +27,7 @@
-rabbit_upgrade({remove_user_scope, []}).
-rabbit_upgrade({hash_passwords, []}).
-rabbit_upgrade({add_ip_to_listener, []}).
+-rabbit_upgrade({internal_exchanges, []}).
%% -------------------------------------------------------------------
@@ -35,6 +36,7 @@
-spec(remove_user_scope/0 :: () -> 'ok').
-spec(hash_passwords/0 :: () -> 'ok').
-spec(add_ip_to_listener/0 :: () -> 'ok').
+-spec(internal_exchanges/0 :: () -> 'ok').
-endif.
@@ -71,6 +73,18 @@ add_ip_to_listener() ->
end,
[node, protocol, host, ip_address, port]).
+internal_exchanges() ->
+ Tables = [rabbit_exchange, rabbit_durable_exchange],
+ AddInternalFun =
+ fun ({exchange, Name, Type, Durable, AutoDelete, Args}) ->
+ {exchange, Name, Type, Durable, AutoDelete, false, Args}
+ end,
+ [ ok = mnesia(T,
+ AddInternalFun,
+ [name, type, durable, auto_delete, internal, arguments])
+ || T <- Tables ],
+ ok.
+
%%--------------------------------------------------------------------
mnesia(TableName, Fun, FieldList) ->