diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-25 15:18:56 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-25 15:18:56 +0000 |
commit | a8b01410a9d90f0e178f6614b5c9f0c2e686fff0 (patch) | |
tree | 59ddd3f99e262a4f472c04ed7275b0dcbe863d44 | |
parent | 0f3554e2fa07f1412552bbea901edfd50f3b5828 (diff) | |
parent | 65efc39ab5e6669c4088e5d26d7885ba252dcf58 (diff) | |
download | rabbitmq-server-a8b01410a9d90f0e178f6614b5c9f0c2e686fff0.tar.gz |
Merge bug24435
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | packaging/macports/Portfile.in | 4 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 23 | ||||
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 3 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 4 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl | 5 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 120 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 13 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 150 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 14 | ||||
-rw-r--r-- | src/rabbit_restartable_sup.erl | 3 | ||||
-rw-r--r-- | src/rabbit_types.erl | 14 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 8 | ||||
-rw-r--r-- | src/tcp_acceptor_sup.erl | 6 | ||||
-rw-r--r-- | src/tcp_listener.erl | 9 | ||||
-rw-r--r-- | src/tcp_listener_sup.erl | 12 | ||||
-rw-r--r-- | src/worker_pool.erl | 7 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 10 |
19 files changed, 219 insertions, 195 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a603886c..d81b82db 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,9 +56,11 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_node, {trie_node, edge_count, binding_count}). -record(topic_trie_edge, {trie_edge, node_id}). -record(topic_trie_binding, {trie_binding, value = const}). +-record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). -record(trie_binding, {exchange_name, node_id, destination}). diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 820197e7..360fb394 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -62,6 +62,10 @@ use_parallel_build yes build.env-append HOME=${workpath} +build.env-append VERSION=${version} + +destroot.env-append VERSION=${version} + destroot.target install_bin destroot.destdir \ diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 6e8f96d9..3ba8f50d 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -144,32 +144,17 @@ -type child() :: pid() | 'undefined'. -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. -type modules() :: [module()] | 'dynamic'. --type restart() :: 'permanent' | 'transient' | 'temporary'. --type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). --type child_spec() :: {Id :: child_id(), - StartFunc :: mfargs(), - Restart :: restart(), - Shutdown :: shutdown(), - Type :: worker(), - Modules :: modules()}. -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type startchild_err() :: 'already_present' - | {'already_started', Child :: child()} | term(). --type startchild_ret() :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', startchild_err()}. - -type group_name() :: any(). -spec start_link(GroupName, Module, Args) -> startlink_ret() when @@ -183,9 +168,9 @@ Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> startchild_ret() when +-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when SupRef :: sup_ref(), - ChildSpec :: child_spec() | (List :: [term()]). + ChildSpec :: supervisor:child_spec() | (List :: [term()]). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), @@ -215,12 +200,12 @@ Modules :: modules(). -spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: 'ok' | {'error', Error :: term()}. -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: startlink_ret(). -spec create_tables() -> Result when diff --git a/src/rabbit.erl b/src/rabbit.erl index 607033cb..9609eb04 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -132,7 +132,7 @@ -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, - {requires, empty_db_check}, + {requires, core_initialized}, {enables, routing_ready}]}). -rabbit_boot_step({mirror_queue_slave_sup, @@ -158,8 +158,9 @@ {enables, networking}]}). -rabbit_boot_step({direct_client, - [{mfa, {rabbit_direct, boot, []}}, - {requires, log_relay}]}). + [{description, "direct client"}, + {mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index fd03ca85..517dd4ec 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -31,10 +31,9 @@ -ifdef(use_specs). --type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0d221b05..655bbb73 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -354,8 +354,8 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). %% For bulk operations we lock the tables we are operating on in order %% to reduce the time complexity. Without the table locks we end up -%% with num_tables*num_bulk_bindings row-level locks. Takiing each -%% lock takes time proportional to the number of existing locks, thus +%% with num_tables*num_bulk_bindings row-level locks. Taking each lock +%% takes time proportional to the number of existing locks, thus %% resulting in O(num_bulk_bindings^2) complexity. %% %% The locks need to be write locks since ultimately we end up diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dfb400e3..4ba01b4f 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,8 +28,9 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/2 :: ({'local', atom()}, mfa()) -> +-spec(start_link/1 :: (rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 348655b1..91c7b5d3 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], +remove_bindings(transaction, _X, Bs) -> + %% See rabbit_binding:lock_route_tables for the rationale for + %% taking table locks. + case Bs of + [_] -> ok; + _ -> [mnesia:lock({table, T}, write) || + T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]] + end, + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,10 +177,30 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node_counts(X, Node, Field, Delta) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case setelement(Field, E, element(Field, E) + Delta) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843..bf997a6f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -268,6 +268,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, @@ -314,12 +319,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> - #trie_edge{exchange_name = exchange_name_match(), - _='_'}. + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), - _='_'}. + #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 045ab89a..e81f8134 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -24,7 +24,7 @@ close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2, +-export([tcp_listener_addresses/1, tcp_listener_spec/6, ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, @@ -47,12 +47,16 @@ -export_type([ip_port/0, hostname/0]). -type(hostname() :: inet:hostname()). --type(ip_port() :: inet:ip_port()). +-type(ip_port() :: inet:port_number()). -type(family() :: atom()). -type(listener_config() :: ip_port() | {hostname(), ip_port()} | {hostname(), ip_port(), family()}). +-type(address() :: {inet:ip_address(), ip_port(), family()}). +-type(name_prefix() :: atom()). +-type(protocol() :: atom()). +-type(label() :: string()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). @@ -76,8 +80,10 @@ -spec(force_connection_event_refresh/0 :: () -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). +-spec(tcp_listener_spec/6 :: + (name_prefix(), address(), [gen_tcp:listen_option()], protocol(), + label(), rabbit_types:mfargs()) -> supervisor:child_spec()). -spec(ensure_ssl/0 :: () -> rabbit_types:infos()). -spec(ssl_transform_fun/1 :: (rabbit_types:infos()) @@ -140,39 +146,6 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -%% inet_parse:address takes care of ip string, like "0.0.0.0" -%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, -%% and runs 'inet_gethost' port process for dns lookups. -%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. - -getaddr(Host, Family) -> - case inet_parse:address(Host) of - {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; - {error, _} -> gethostaddr(Host, Family) - end. - -gethostaddr(Host, auto) -> - Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], - case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of - [] -> host_lookup_error(Host, Lookups); - IPs -> IPs - end; - -gethostaddr(Host, Family) -> - case inet:getaddr(Host, Family) of - {ok, IPAddress} -> [{IPAddress, Family}]; - {error, Reason} -> host_lookup_error(Host, Reason) - end. - -host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}). - -resolve_family({_,_,_,_}, auto) -> inet; -resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; -resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); -resolve_family(_, F) -> F. - ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), @@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) -> end end. -check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {"auto", Port}) -> +tcp_listener_addresses(Port) when is_integer(Port) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({"auto", Port}) -> %% Variant to prevent lots of hacking around in bash and batch files - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {Host, Port}) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({Host, Port}) -> %% auto: determine family IPv4 / IPv6 after converting to IP address - check_tcp_listener_address(NamePrefix, {Host, Port, auto}); - -check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> - if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; - true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", - [Port]), - throw({error, {invalid_port, Port}}) - end, - [{IPAddress, Port, Family, - rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || - {IPAddress, Family} <- getaddr(Host, Family0)]. - -check_tcp_listener_address_auto(NamePrefix, Port) -> - lists:append([check_tcp_listener_address(NamePrefix, Listener) || + tcp_listener_addresses({Host, Port, auto}); +tcp_listener_addresses({Host, Port, Family0}) + when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> + [{IPAddress, Port, Family} || + {IPAddress, Family} <- getaddr(Host, Family0)]; +tcp_listener_addresses({_Host, Port, _Family0}) -> + error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + throw({error, {invalid_port, Port}}). + +tcp_listener_addresses_auto(Port) -> + lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, + Protocol, Label, OnConnect) -> + {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {tcp_listener_sup, start_link, + [IPAddress, Port, [Family | SocketOpts], + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + start_tcp_listener(Listener) -> start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). @@ -235,27 +213,21 @@ start_ssl_listener(Listener, SslOpts) -> {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Listener, Protocol, Label, OnConnect) -> - [start_listener0(Spec, Protocol, Label, OnConnect) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [start_listener0(Address, Protocol, Label, OnConnect) || + Address <- tcp_listener_addresses(Listener)], ok. -start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> - {ok,_} = supervisor:start_child( - rabbit_sup, - {Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | tcp_opts()], - {?MODULE, tcp_listener_started, [Protocol]}, - {?MODULE, tcp_listener_stopped, [Protocol]}, - OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}). +start_listener0(Address, Protocol, Label, OnConnect) -> + Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), + Protocol, Label, OnConnect), + {ok,_} = supervisor:start_child(rabbit_sup, Spec). stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Spec) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [stop_tcp_listener0(Address) || + Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). @@ -363,6 +335,38 @@ tcp_opts() -> {ok, Opts} = application:get_env(rabbit, tcp_listen_options), Opts. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +getaddr(Host, Family) -> + case inet_parse:address(Host) of + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) + end. + +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 62c004f7..de2ba8ad 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -55,13 +55,20 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Command = list_to_atom(Command0), + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> + PrintInvalidCommandError(), usage(); {error, Reason} -> print_error("~p", [Reason]), @@ -325,6 +332,9 @@ lookup_plugins(Names, AllPlugins) -> read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; + {ok, []} -> []; + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index cda3ccbe..1a08efed 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -28,7 +28,8 @@ -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2db960ac..ae2b5d3f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -28,12 +28,9 @@ binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, internal_user/0, - username/0, password/0, password_hash/0, ok/1, error/1, - ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0]). - --type(channel_exit() :: no_return()). --type(connection_exit() :: no_return()). + username/0, password/0, password_hash/0, + ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, + channel_exit/0, connection_exit/0, mfargs/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +153,9 @@ -type(ok_or_error2(A, B) :: ok(A) | error(B)). -type(ok_pid_or_error() :: ok_or_error2(pid(), any())). +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -endif. % use_specs diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e0ca8cbb..f164035e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -35,6 +35,7 @@ -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). +-rabbit_upgrade({topic_trie_node, mnesia, []}). %% ------------------------------------------------------------------- @@ -54,6 +55,7 @@ -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). -endif. @@ -177,6 +179,12 @@ mirrored_supervisor() -> [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +topic_trie_node() -> + create(rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, [trie_node, edge_count, binding_count]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index 4c835598..cb3dd02c 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -25,7 +25,11 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + +-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index ad2a0d02..9a82ac88 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -28,9 +28,14 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), - atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + integer(), atom(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). + -endif. %%-------------------------------------------------------------------- diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 5bff5c27..74297b6d 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -26,12 +26,16 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/7 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), integer(), string()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 456ff39f..fcb07a16 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -37,10 +37,11 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/1 :: - (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 78ab4df3..b42530e2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -29,12 +29,12 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). --spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/2 :: - (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). --spec(run/1 :: (fun (() -> A)) -> A; - ({atom(), atom(), [any()]}) -> any()). +-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. |