summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-01-27 17:00:12 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-01-27 17:00:12 +0000
commitaae40a595c494e8c2e4d4c153da95523417a5c13 (patch)
tree85fb821b38799e5b96778f69d84153cc76e9bbc5
parentfee05b3269836ae5d142e23c85931172160dc1b1 (diff)
parent722d463e1bcae4e07ef51f91a32a71bbe44195ab (diff)
downloadrabbitmq-server-aae40a595c494e8c2e4d4c153da95523417a5c13.tar.gz
Merged bug24664 into default
-rw-r--r--docs/html-to-website-xml.xsl44
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl12
-rw-r--r--packaging/macports/Portfile.in4
-rw-r--r--src/mirrored_supervisor.erl23
-rw-r--r--src/rabbit.erl18
-rw-r--r--src/rabbit_access_control.erl1
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_binding.erl4
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_control.erl14
-rw-r--r--src/rabbit_exchange.erl18
-rw-r--r--src/rabbit_exchange_type_invalid.erl47
-rw-r--r--src/rabbit_exchange_type_topic.erl120
-rw-r--r--src/rabbit_log.erl33
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_networking.erl155
-rw-r--r--src/rabbit_plugins.erl14
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_restartable_sup.erl3
-rw-r--r--src/rabbit_ssl.erl26
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/rabbit_upgrade_functions.erl8
-rw-r--r--src/rabbit_writer.erl2
-rw-r--r--src/supervisor2.erl4
-rw-r--r--src/tcp_acceptor_sup.erl6
-rw-r--r--src/tcp_listener.erl14
-rw-r--r--src/tcp_listener_sup.erl12
-rw-r--r--src/worker_pool.erl7
-rw-r--r--src/worker_pool_worker.erl10
31 files changed, 347 insertions, 296 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 88aa2e78..d83d5073 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -8,8 +8,6 @@
<xsl:output method="xml" />
-<xsl:template match="*"/>
-
<!-- Copy every element through -->
<xsl:template match="*">
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
@@ -28,36 +26,30 @@
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
- <body>
- <doc:div>
- <xsl:choose>
+ <body show-in-this-page="true">
+ <xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
- <p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
- </p>
- <p>
- <a href="../manpages.html">See a list of all manual pages</a>.
- </p>
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="../manpages.html">See a list of all manual pages</a>.
+ </p>
</xsl:when>
<xsl:otherwise>
- <p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
- </p>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
</xsl:otherwise>
- </xsl:choose>
- <p>
+ </xsl:choose>
+ <p>
For more general documentation, please see the
- <a href="../admin-guide.html">administrator's guide</a>.
- </p>
-
- <doc:toc class="compact">
- <doc:heading>Table of Contents</doc:heading>
- </doc:toc>
+ <a href="../admin-guide.html">administrator's guide</a>.
+ </p>
- <xsl:apply-templates select="body/div[@class='refentry']"/>
- </doc:div>
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
</body>
</html>
</xsl:template>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5ead1051..9301af6b 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -42,5 +42,6 @@
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
+ {linger, {true, 0}},
{exit_on_close, false}]}
]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a603886c..c38eca7c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
@@ -96,13 +98,3 @@
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 820197e7..360fb394 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -62,6 +62,10 @@ use_parallel_build yes
build.env-append HOME=${workpath}
+build.env-append VERSION=${version}
+
+destroot.env-append VERSION=${version}
+
destroot.target install_bin
destroot.destdir \
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 6e8f96d9..3ba8f50d 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 267987e7..3dcd4938 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -132,7 +132,7 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
- {requires, empty_db_check},
+ {requires, core_initialized},
{enables, routing_ready}]}).
-rabbit_boot_step({mirror_queue_slave_sup,
@@ -158,8 +158,9 @@
{enables, networking}]}).
-rabbit_boot_step({direct_client,
- [{mfa, {rabbit_direct, boot, []}},
- {requires, log_relay}]}).
+ [{description, "direct client"},
+ {mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -441,8 +442,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -501,8 +501,14 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d686..ec9affa6 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index fd03ca85..517dd4ec 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -31,10 +31,9 @@
-ifdef(use_specs).
--type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 989a8011..c21db21b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -135,7 +134,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -1114,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- {stop, normal, State};
+ true -> {stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1163,7 +1160,6 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 0d221b05..655bbb73 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -354,8 +354,8 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
%% For bulk operations we lock the tables we are operating on in order
%% to reduce the time complexity. Without the table locks we end up
-%% with num_tables*num_bulk_bindings row-level locks. Takiing each
-%% lock takes time proportional to the number of existing locks, thus
+%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
+%% takes time proportional to the number of existing locks, thus
%% resulting in O(num_bulk_bindings^2) complexity.
%%
%% The locks need to be write locks since ultimately we end up
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dfb400e3..4ba01b4f 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
--spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+-spec(start_link/1 :: (rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 20486af5..22b57b1a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -79,6 +79,12 @@ start() ->
io:format(Format ++ " ...~n", Args1)
end
end,
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
case catch action(Command, Node, Args, Opts, Inform) of
@@ -88,9 +94,11 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15
+ PrintInvalidCommandError(),
usage();
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a15b9be4..68c0d988 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -355,11 +355,21 @@ peek_serial(XName) ->
_ -> undefined
end.
+invalid_module(T) ->
+ rabbit_log:warning(
+ "Could not find exchange type ~s.~n", [T]),
+ put({xtype_to_module, T}, rabbit_exchange_type_invalid),
+ rabbit_exchange_type_invalid.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
case get({xtype_to_module, T}) of
- undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- put({xtype_to_module, T}, Module),
- Module;
- Module -> Module
+ undefined ->
+ case rabbit_registry:lookup_module(exchange, T) of
+ {ok, Module} -> put({xtype_to_module, T}, Module),
+ Module;
+ {error, not_found} -> invalid_module(T)
+ end;
+ Module ->
+ Module
end.
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
new file mode 100644
index 00000000..8f60f7d8
--- /dev/null
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_type_invalid).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+description() ->
+ [{name, <<"invalid">>},
+ {description,
+ <<"Dummy exchange type, to be used when the intended one is not found.">>
+ }].
+
+serialise_events() -> false.
+
+route(#exchange{name = Name, type = Type}, _) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot route message through ~s: exchange type ~s not found",
+ [rabbit_misc:rs(Name), Type]).
+
+validate(_X) -> ok.
+create(_Tx, _X) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 348655b1..91c7b5d3 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
-
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+remove_bindings(transaction, _X, Bs) ->
+ %% See rabbit_binding:lock_route_tables for the rationale for
+ %% taking table locks.
+ case Bs of
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]]
+ end,
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node_counts(X, Node, Field, Delta) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case setelement(Field, E, element(Field, E) + Delta) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e0957..8f58f848 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -33,8 +32,6 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,8 +39,6 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
@@ -51,16 +46,6 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
-
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -86,22 +71,6 @@ init([]) -> {ok, none}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
{noreply, State};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843..bf997a6f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -268,6 +268,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},
@@ -314,12 +319,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
trie_edge_match() ->
- #trie_edge{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 045ab89a..923967ea 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -24,7 +24,7 @@
close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2,
+-export([tcp_listener_addresses/1, tcp_listener_spec/6,
ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
@@ -47,12 +47,16 @@
-export_type([ip_port/0, hostname/0]).
-type(hostname() :: inet:hostname()).
--type(ip_port() :: inet:ip_port()).
+-type(ip_port() :: inet:port_number()).
-type(family() :: atom()).
-type(listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}).
+-type(address() :: {inet:ip_address(), ip_port(), family()}).
+-type(name_prefix() :: atom()).
+-type(protocol() :: atom()).
+-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
@@ -76,8 +80,10 @@
-spec(force_connection_event_refresh/0 :: () -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
+-spec(tcp_listener_spec/6 ::
+ (name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
+ label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
-spec(ssl_transform_fun/1 ::
(rabbit_types:infos())
@@ -140,39 +146,6 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-%% inet_parse:address takes care of ip string, like "0.0.0.0"
-%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
-%% and runs 'inet_gethost' port process for dns lookups.
-%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
-
-getaddr(Host, Family) ->
- case inet_parse:address(Host) of
- {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
- {error, _} -> gethostaddr(Host, Family)
- end.
-
-gethostaddr(Host, auto) ->
- Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
- case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
- [] -> host_lookup_error(Host, Lookups);
- IPs -> IPs
- end;
-
-gethostaddr(Host, Family) ->
- case inet:getaddr(Host, Family) of
- {ok, IPAddress} -> [{IPAddress, Family}];
- {error, Reason} -> host_lookup_error(Host, Reason)
- end.
-
-host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}}).
-
-resolve_family({_,_,_,_}, auto) -> inet;
-resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
-resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
-resolve_family(_, F) -> F.
-
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
@@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) ->
end
end.
-check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
- check_tcp_listener_address(NamePrefix, {Host, Port, auto});
-
-check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
- if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
- true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
- [Port]),
- throw({error, {invalid_port, Port}})
- end,
- [{IPAddress, Port, Family,
- rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
- {IPAddress, Family} <- getaddr(Host, Family0)].
-
-check_tcp_listener_address_auto(NamePrefix, Port) ->
- lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Protocol, Label, OnConnect) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, [Family | SocketOpts],
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
+ OnConnect, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
start_tcp_listener(Listener) ->
start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
@@ -235,27 +213,26 @@ start_ssl_listener(Listener, SslOpts) ->
{?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Listener, Protocol, Label, OnConnect) ->
- [start_listener0(Spec, Protocol, Label, OnConnect) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [start_listener0(Address, Protocol, Label, OnConnect) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
- {ok,_} = supervisor:start_child(
- rabbit_sup,
- {Name,
- {tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | tcp_opts()],
- {?MODULE, tcp_listener_started, [Protocol]},
- {?MODULE, tcp_listener_stopped, [Protocol]},
- OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}).
+start_listener0(Address, Protocol, Label, OnConnect) ->
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
+ Protocol, Label, OnConnect),
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Spec) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
@@ -363,6 +340,38 @@ tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 62c004f7..de2ba8ad 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -55,13 +55,20 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Command = list_to_atom(Command0),
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -325,6 +332,9 @@ lookup_plugins(Names, AllPlugins) ->
read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 07a446e9..6e2ddedb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -29,7 +29,7 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
--define(CLOSING_TIMEOUT, 1).
+-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index cda3ccbe..1a08efed 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -28,7 +28,8 @@
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e0defa9e..e524446e 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -21,7 +21,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
--export([peer_cert_subject_item/2]).
+-export([peer_cert_subject_items/2]).
%%--------------------------------------------------------------------------
@@ -34,8 +34,8 @@
-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
-spec(peer_cert_subject/1 :: (certificate()) -> string()).
-spec(peer_cert_validity/1 :: (certificate()) -> string()).
--spec(peer_cert_subject_item/2 ::
- (certificate(), tuple()) -> string() | 'not_found').
+-spec(peer_cert_subject_items/2 ::
+ (certificate(), tuple()) -> [string()] | 'not_found').
-endif.
@@ -59,8 +59,8 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
-%% Return a part of the certificate's subject.
-peer_cert_subject_item(Cert, Type) ->
+%% Return the parts of the certificate's subject.
+peer_cert_subject_items(Cert, Type) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
subject = Subject }}) ->
@@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
- [Val] -> format_asn1_value(Val);
- [] -> not_found
+ [] -> not_found;
+ L -> [format_asn1_value(V) || V <- L]
end.
%%--------------------------------------------------------------------------
@@ -150,9 +150,11 @@ escape_rdn_value([$ ], middle) ->
escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;;
C =:= $<; C =:= $>; C =:= $\\ ->
[$\\, C | escape_rdn_value(S, middle)];
-escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 ->
- %% only U+0000 needs escaping, but for display purposes it's handy
- %% to escape all non-printable chars
+escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
+ %% Of ASCII characters only U+0000 needs escaping, but for display
+ %% purposes it's handy to escape all non-printable chars. All non-ASCII
+ %% characters get converted to UTF-8 sequences and then escaped. We've
+ %% already got a UTF-8 sequence here, so just escape it.
lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
@@ -167,6 +169,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
+%% We appear to get an untagged value back for an ia5string
+%% (e.g. domainComponent).
+format_asn1_value(V) when is_list(V) ->
+ V;
format_asn1_value(V) ->
io_lib:format("~p", [V]).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2db960ac..ae2b5d3f 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -28,12 +28,9 @@
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, internal_user/0,
- username/0, password/0, password_hash/0, ok/1, error/1,
- ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0]).
-
--type(channel_exit() :: no_return()).
--type(connection_exit() :: no_return()).
+ username/0, password/0, password_hash/0,
+ ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
+ channel_exit/0, connection_exit/0, mfargs/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +153,9 @@
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-endif. % use_specs
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e0ca8cbb..f164035e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -35,6 +35,7 @@
-rabbit_upgrade({gm, mnesia, []}).
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
+-rabbit_upgrade({topic_trie_node, mnesia, []}).
%% -------------------------------------------------------------------
@@ -54,6 +55,7 @@
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
+-spec(topic_trie_node/0 :: () -> 'ok').
-endif.
@@ -177,6 +179,12 @@ mirrored_supervisor() ->
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+topic_trie_node() ->
+ create(rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, [trie_node, edge_count, binding_count]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4..f6062e06 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -169,12 +169,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f75da872..26ea502c 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
ok;
{error, normal} ->
case Child#child.restart_type of
- permanent -> ReportError(normal);
- {permanent, _Delay} -> ReportError(normal);
+ permanent -> ReportError(normal, Child);
+ {permanent, _Delay} -> ReportError(normal, Child);
_ -> ok
end;
{error, OtherReason} ->
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index 4c835598..cb3dd02c 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -25,7 +25,11 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
+-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index ad2a0d02..e5db4c9f 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -28,9 +28,14 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
- atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ integer(), atom(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
+
-endif.
%%--------------------------------------------------------------------
@@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 5bff5c27..74297b6d 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -26,12 +26,16 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/7 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), integer(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 456ff39f..fcb07a16 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -37,10 +37,11 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/1 ::
- (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 78ab4df3..b42530e2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -29,12 +29,12 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
--spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/2 ::
- (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
--spec(run/1 :: (fun (() -> A)) -> A;
- ({atom(), atom(), [any()]}) -> any()).
+-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.