summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-09-06 12:01:48 +0100
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2010-09-06 12:01:48 +0100
commit9698cab2b597aaadbbdc51f26f29c3cc6ddb7f12 (patch)
tree9b3b69a83c75c499663efa3baac03a7539ccc4bc
parent2ae1ddd84c5896bd8888d807533fcfcbd5ad303f (diff)
parent0678a93d7cccb2ecfc76663f09570c5a645a917c (diff)
downloadrabbitmq-server-9698cab2b597aaadbbdc51f26f29c3cc6ddb7f12.tar.gz
merging in from default
-rw-r--r--packaging/generic-unix/Makefile1
-rw-r--r--packaging/windows/Makefile1
-rw-r--r--src/file_handle_cache.erl22
-rw-r--r--src/gen_server2.erl54
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_access_control.erl25
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_binding.erl357
-rw-r--r--src/rabbit_channel.erl64
-rw-r--r--src/rabbit_channel_sup.erl96
-rw-r--r--src/rabbit_channel_sup_sup.erl (renamed from src/rabbit_hooks.erl)52
-rw-r--r--src/rabbit_connection_sup.erl99
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_exchange.erl367
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_framing_channel.erl19
-rw-r--r--src/rabbit_heartbeat.erl59
-rw-r--r--src/rabbit_limiter.erl24
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_msg_store.erl25
-rw-r--r--src/rabbit_multi.erl9
-rw-r--r--src/rabbit_networking.erl25
-rw-r--r--src/rabbit_plugin_activator.erl74
-rw-r--r--src/rabbit_queue_collector.erl10
-rw-r--r--src/rabbit_reader.erl228
-rw-r--r--src/rabbit_router.erl10
-rw-r--r--src/rabbit_tests.erl78
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/rabbit_writer.erl137
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/tcp_client_sup.erl10
34 files changed, 1043 insertions, 896 deletions
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index 4eade6c7..c4e01f4a 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR) \
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index f47b5340..abe174e0 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR)
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f83fa0bc..aecfb096 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -131,6 +131,7 @@
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -242,6 +243,7 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
@@ -781,7 +783,11 @@ init([]) ->
Watermark > 0) ->
Watermark;
_ ->
- ulimit()
+ case ulimit() of
+ infinity -> infinity;
+ unknown -> ?FILE_HANDLES_LIMIT_OTHER;
+ Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
+ end
end,
ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
@@ -1131,7 +1137,7 @@ track_client(Pid, Clients) ->
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
+ ?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
@@ -1141,16 +1147,14 @@ ulimit() ->
"unlimited" ->
infinity;
String = [C|_] when $0 =< C andalso C =< $9 ->
- Num = list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String)) -
- ?RESERVED_FOR_OTHERS,
- lists:max([1, Num]);
+ list_to_integer(
+ lists:takewhile(
+ fun (D) -> $0 =< D andalso D =< $9 end, String));
_ ->
%% probably a variant of
%% "/bin/sh: line 1: ulimit: command not found\n"
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end;
_ ->
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index f1c8eb4d..9fb9e2fe 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -976,7 +976,7 @@ print_event(Dev, Event, Name) ->
terminate(Reason, Name, Msg, Mod, State, Debug) ->
case catch Mod:terminate(Reason, State) of
{'EXIT', R} ->
- error_info(R, Name, Msg, State, Debug),
+ error_info(R, Reason, Name, Msg, State, Debug),
exit(R);
_ ->
case Reason of
@@ -987,42 +987,44 @@ terminate(Reason, Name, Msg, Mod, State, Debug) ->
{shutdown,_}=Shutdown ->
exit(Shutdown);
_ ->
- error_info(Reason, Name, Msg, State, Debug),
+ error_info(Reason, undefined, Name, Msg, State, Debug),
exit(Reason)
end
end.
-error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
%% OTP-5811 Don't send an error report if it's the system process
%% application_controller which is terminating - let init take care
%% of it instead
ok;
-error_info(Reason, Name, Msg, State, Debug) ->
- Reason1 =
- case Reason of
- {undef,[{M,F,A}|MFAs]} ->
- case code:is_loaded(M) of
- false ->
- {'module could not be loaded',[{M,F,A}|MFAs]};
- _ ->
- case erlang:function_exported(M, F, length(A)) of
- true ->
- Reason;
- false ->
- {'function not exported',[{M,F,A}|MFAs]}
- end
- end;
- _ ->
- Reason
- end,
- format("** Generic server ~p terminating \n"
- "** Last message in was ~p~n"
- "** When Server state == ~p~n"
- "** Reason for termination == ~n** ~p~n",
- [Name, Msg, State, Reason1]),
+error_info(Reason, RootCause, Name, Msg, State, Debug) ->
+ Reason1 = error_reason(Reason),
+ Fmt =
+ "** Generic server ~p terminating~n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ case RootCause of
+ undefined -> format(Fmt, [Name, Msg, State, Reason1]);
+ _ -> format(Fmt ++ "** In 'terminate' callback "
+ "with reason ==~n** ~p~n",
+ [Name, Msg, State, Reason1,
+ error_reason(RootCause)])
+ end,
sys:print_log(Debug),
ok.
+error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
+ case code:is_loaded(M) of
+ false -> {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ -> case erlang:function_exported(M, F, length(A)) of
+ true -> Reason;
+ false -> {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+error_reason(Reason) ->
+ Reason.
+
%%% ---------------------------------------------------
%%% Misc. functions.
%%% ---------------------------------------------------
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 41c628a0..c2574970 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -83,12 +83,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_hooks,
- [{description, "internal event notification system"},
- {mfa, {rabbit_hooks, start, []}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -211,8 +205,7 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir().
+ ok = ensure_working_log_handlers().
start() ->
try
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 8d00f591..9cfe1ca8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -171,10 +171,6 @@ check_resource_access(Username,
check_resource_access(Username,
R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(_Username,
- #resource{name = <<"amq.gen",_/binary>>},
- #permission{scope = client}) ->
- ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
Permission) ->
@@ -184,14 +180,19 @@ check_resource_access(Username,
[] ->
false;
[#user_permission{permission = P}] ->
- PermRegexp = case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
+ case {Name, P} of
+ {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
+ true;
+ _ ->
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
end
end,
if Res -> ok;
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0cdb4fff..8a923780 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,7 +56,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(EXPIRES_TYPE, long).
+-define(EXPIRES_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -251,8 +251,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
+ rabbit_binding:add(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -313,13 +313,13 @@ check_declare_arguments(QueueName, Args) ->
check_expires_argument(undefined) ->
ok;
-check_expires_argument({?EXPIRES_TYPE, Expires})
- when is_integer(Expires) andalso Expires > 0 ->
- ok;
-check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
- {error, expires_zero_or_less};
-check_expires_argument(_) ->
- {error, expires_not_of_type_long}.
+check_expires_argument({Type, Expires}) when Expires > 0 ->
+ case lists:member(Type, ?EXPIRES_TYPES) of
+ true -> ok;
+ false -> {error, {expires_not_of_acceptable_type, Type, Expires}}
+ end;
+check_expires_argument({_Type, _Expires}) ->
+ {error, expires_zero_or_less}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -433,7 +433,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName).
+ rabbit_binding:remove_for_queue(QueueName).
internal_delete(QueueName) ->
case
@@ -478,7 +478,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ Post = rabbit_binding:remove_transient_for_queue(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2cab7136..08495862 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,8 +146,8 @@ code_change(_OldVsn, State, _Extra) ->
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
+ {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
+ undefined -> State
end.
declare(Recover, From,
@@ -163,10 +163,8 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- rabbit_event:notify(
- queue_created,
- [{Item, i(Item, State)} ||
- Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State)),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -587,8 +585,7 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
new file mode 100644
index 00000000..3569ba93
--- /dev/null
+++ b/src/rabbit_binding.erl
@@ -0,0 +1,357 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binding).
+-include("rabbit.hrl").
+
+-export([recover/0, add/5, remove/5, list/1]).
+-export([list_for_exchange/1, list_for_queue/1]).
+%% these must all be run inside a mnesia tx
+-export([has_for_exchange/1, remove_for_exchange/1,
+ remove_for_queue/1, remove_transient_for_queue/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([key/0]).
+
+-type(key() :: binary()).
+
+-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+
+-spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(add/5 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(), rabbit_framing:amqp_table(),
+ inner_fun()) -> bind_res()).
+-spec(remove/5 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(), rabbit_framing:amqp_table(),
+ inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) ->
+ [{rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_for_exchange/1 ::
+ (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
+-spec(remove_for_exchange/1 ::
+ (rabbit_exchange:name()) -> [rabbit_types:binding()]).
+-spec(remove_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_transient_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+recover() ->
+ rabbit_misc:table_fold(
+ fun (Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route).
+
+add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ ok = sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ rabbit_event:notify(
+ binding_created,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName},
+ {routing_key, RoutingKey},
+ {arguments, Arguments}]),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, Binding} ->
+ (type_to_module(Type)):add_binding(Exchange, Binding);
+ {existing, _, _} ->
+ ok;
+ {error, _} = Err ->
+ Err
+ end.
+
+remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ case InnerFun(X, Q) of
+ ok ->
+ ok =
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ rabbit_event:notify(
+ binding_deleted,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName}]),
+ Del = rabbit_exchange:maybe_auto_delete(X),
+ {{Del, X}, B};
+ {error, _} = E ->
+ E
+ end
+ end
+ end) of
+ {error, _} = Err ->
+ Err;
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
+ end
+ end.
+
+list(VHostPath) ->
+ [{ExchangeName, QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{
+ exchange_name = ExchangeName,
+ key = RoutingKey,
+ queue_name = QueueName,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ _ = '_'},
+ _ = '_'})].
+
+list_for_exchange(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ [{QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
+
+% Refactoring is left as an exercise for the reader
+list_for_queue(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+ [{ExchangeName, RoutingKey, Arguments} ||
+ #route{binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
+
+has_for_exchange(ExchangeName) ->
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+
+remove_for_exchange(ExchangeName) ->
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route),
+ Route#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)].
+
+remove_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_forward_routes/1).
+
+remove_transient_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+
+%%----------------------------------------------------------------------------
+
+binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ Fun(X, Q, #binding{
+ exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = rabbit_misc:sort_field_table(Arguments)})
+ end).
+
+sync_binding(Binding, Durable, Fun) ->
+ ok = case Durable of
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
+ false -> ok
+ end,
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
+ ok.
+
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
+ end).
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+remove_for_queue(QueueName, FwdDeleteFun) ->
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Grouped = group_bindings_and_auto_delete(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ not_deleted -> Module:remove_bindings(X, Bs)
+ end
+ end, Grouped)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% group_bindings_and_auto_delete1) works properly.
+group_bindings_and_auto_delete([], Acc) ->
+ Acc;
+group_bindings_and_auto_delete(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc).
+
+group_bindings_and_auto_delete(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc);
+group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) ->
+ %% either Removed is [], or its head has a non-matching ExchangeName
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
+ group_bindings_and_auto_delete(Removed, NewAcc).
+
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 138df716..60b807fa 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/6, do/2, do/3, shutdown/1]).
+-export([start_link/7, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, flush/1]).
@@ -44,7 +44,7 @@
handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
- transaction_id, tx_participants, next_tag,
+ start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer}).
@@ -76,9 +76,11 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_types:vhost(), pid(),
+ fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -100,9 +102,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun) ->
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -150,15 +153,16 @@ flush(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun]) ->
process_flag(trap_exit, true),
- link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
+ start_limiter_fun = StartLimiterFun,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -171,9 +175,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer()},
- rabbit_event:notify(
- channel_created,
- [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -239,12 +241,6 @@ handle_cast(emit_stats, State) ->
internal_emit_stats(State),
{noreply, State}.
-handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
- State = #ch{writer_pid = WriterPid}) ->
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, normal, State};
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
@@ -259,8 +255,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
@@ -403,7 +401,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -807,17 +805,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
- NoWait, State);
+ binding_action(fun rabbit_binding:add/5,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
- false, State);
+ binding_action(fun rabbit_binding:remove/5,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -1016,8 +1014,8 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
+ {ok, LPid} = SLF(queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
@@ -1089,11 +1087,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1150,7 +1146,7 @@ update_measures(Type, QX, Inc, Measure) ->
orddict:store(Measure, Cur + Inc, Measures)).
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
- CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
rabbit_event:notify(channel_stats, CoarseStats);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 00000000..02199a65
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,96 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([start_link_args/0]).
+
+-type(start_link_args() ::
+ {rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ReaderPid, ChannelPid, Protocol]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, SupPid, FramingChannelPid}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_limiter_fun(SupPid) ->
+ fun (UnackedCount) ->
+ Me = self(),
+ {ok, _Pid} =
+ supervisor2:start_child(
+ SupPid,
+ {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
diff --git a/src/rabbit_hooks.erl b/src/rabbit_channel_sup_sup.erl
index 3fc84c1e..d1938805 100644
--- a/src/rabbit_hooks.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -29,45 +29,37 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_hooks).
+-module(rabbit_channel_sup_sup).
--export([start/0]).
--export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]).
+-behaviour(supervisor2).
--define(TableName, rabbit_hooks).
+-export([start_link/0, start_channel/2]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start/0 :: () -> 'ok').
--spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok').
--spec(unsubscribe/2 :: (atom(), atom()) -> 'ok').
--spec(trigger/2 :: (atom(), list()) -> 'ok').
--spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok').
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
+ {'ok', pid(), pid()}).
-endif.
-start() ->
- ets:new(?TableName, [bag, public, named_table]),
- ok.
+%%----------------------------------------------------------------------------
-subscribe(Hook, HandlerName, Handler) ->
- ets:insert(?TableName, {Hook, HandlerName, Handler}),
- ok.
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
-unsubscribe(Hook, HandlerName) ->
- ets:match_delete(?TableName, {Hook, HandlerName, '_'}),
- ok.
+start_channel(Pid, Args) ->
+ {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]),
+ link(ChSupPid),
+ Result.
-trigger(Hook, Args) ->
- Hooks = ets:lookup(?TableName, Hook),
- [case catch apply(M, F, [Hook, Name, Args | A]) of
- {'EXIT', Reason} ->
- rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
- [Name, Hook, Reason]);
- _ -> ok
- end || {_, Name, {M, F, A}} <- Hooks],
- ok.
+%%----------------------------------------------------------------------------
-notify_remote(Hook, HandlerName, Args, Pid, PidArgs) ->
- Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]},
- ok.
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
new file mode 100644
index 00000000..b3821d3b
--- /dev/null
+++ b/src/rabbit_connection_sup.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, reader/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid(), pid()}).
+-spec(reader/1 :: (pid()) -> pid()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+
+start_link() ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, Collector} =
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link,
+ [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
+
+%%--------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_heartbeat_fun(SupPid) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, TimeoutSec) ->
+ Parent = self(),
+ {ok, Sender} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_sender,
+ {rabbit_heartbeat, start_heartbeat_sender,
+ [Parent, Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Parent, Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index f0b623c2..cca2e3d1 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -252,7 +252,7 @@ action(list_bindings, Node, _Args, Opts, Inform) ->
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
+ X <- rpc_call(Node, rabbit_binding, list, [VHostArg])],
InfoKeys);
action(list_connections, Node, Args, _Opts, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index af4eb1bd..40bee25f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,38 +34,19 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2]).
--export([add_binding/5, delete_binding/5, list_bindings/1]).
--export([delete/2]).
--export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([assert_equivalence/5]).
--export([assert_args_equivalence/2]).
--export([check_type/1]).
-
-%% EXTENDED API
--export([list_exchange_bindings/1]).
--export([list_queue_bindings/1]).
-
--import(mnesia).
--import(sets).
--import(lists).
+ info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+%% this must be run inside a mnesia tx
+-export([maybe_auto_delete/1]).
+-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, type/0, binding_key/0]).
+-export_type([name/0, type/0]).
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
--type(binding_key() :: binary()).
-
--type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
--type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
- rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -97,32 +78,12 @@
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
--spec(add_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
--spec(delete_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list_bindings/1 ::
- (rabbit_types:vhost())
- -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(list_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> [{name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_exchange_bindings/1 ::
- (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
+ 'not_deleted' | 'auto_deleted').
-endif.
@@ -136,19 +97,7 @@ recover() ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
- Bs = rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route),
- recover_with_bindings(Bs, Exs),
- ok.
-
-recover_with_bindings(Bs, Exs) ->
+ Bs = rabbit_binding:recover(),
recover_with_bindings(
lists:keysort(#binding.exchange_name, Bs),
lists:keysort(#exchange.name, Exs), []).
@@ -164,11 +113,11 @@ recover_with_bindings([], [], []) ->
ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
+ Exchange = #exchange{name = ExchangeName,
+ type = Type,
+ durable = Durable,
auto_delete = AutoDelete,
- arguments = Args},
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
@@ -192,9 +141,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
- rabbit_event:notify(
- exchange_created,
- [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
+ rabbit_event:notify(exchange_created, info(X)),
X;
{existing, X} -> X;
Err -> Err
@@ -220,9 +167,9 @@ check_type(TypeBin) ->
end
end.
-assert_equivalence(X = #exchange{ durable = Durable,
+assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
- type = Type},
+ type = Type},
Type, Durable, AutoDelete, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
@@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_args_equivalence(#exchange{ name = Name,
- arguments = Args },
+assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
@@ -311,92 +257,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
-%% TODO: Should all of the route and binding management not be
-%% refactored to its own module, especially seeing as unbind will have
-%% to be implemented for 0.91 ?
-
-delete_exchange_bindings(ExchangeName) ->
- [begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
- Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- write)].
-
-delete_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_forward_routes/1).
-
-delete_transient_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
-
-delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
- write)],
- Cleanup = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Cleanup)
- end.
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], Acc) ->
- Acc;
-cleanup_deleted_queue_bindings(
- [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
-
-cleanup_deleted_queue_bindings(
- ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
- Bindings, Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
-cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
- %% either Deleted is [], or its head has a non-matching ExchangeName
- NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
- cleanup_deleted_queue_bindings(Deleted, NewAcc).
-
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- {maybe_auto_delete(X), Bindings}.
-
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-contains(Table, MatchHead) ->
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
-
-continue('$end_of_table') -> false;
-continue({[_|_], _}) -> true;
-continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
@@ -405,156 +265,6 @@ call_with_exchange(Exchange, Fun) ->
end
end).
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
- end).
-
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(X, Q) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
-
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- {maybe_auto_delete(X), B};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
- end.
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
- end).
-
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
-
-list_bindings(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
-reverse_route(#route{binding = Binding}) ->
- #reverse_route{reverse_binding = reverse_binding(Binding)};
-
-reverse_route(#reverse_route{reverse_binding = Binding}) ->
- #route{binding = reverse_binding(Binding)}.
-
-reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
delete(ExchangeName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
@@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {not_deleted, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ not_deleted;
+maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {not_deleted, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ {error, in_use} -> not_deleted;
+ {deleted, Exchange, []} -> auto_deleted
end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse
- contains(rabbit_durable_route, Match) of
+ case rabbit_binding:has_for_exchange(ExchangeName) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = delete_exchange_bindings(ExchangeName),
+ Bindings = rabbit_binding:remove_for_exchange(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
-
-%%----------------------------------------------------------------------------
-%% EXTENDED API
-%% These are API calls that are not used by the server internally,
-%% they are exported for embedded clients to use
-
-%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
-%% return {QueueName, RoutingKey, Arguments} tuples
-list_exchange_bindings(ExchangeName) ->
- Route = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
-
-% Refactoring is left as an exercise for the reader
-list_queue_bindings(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName,
- _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 44607398..02e829ec 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -79,8 +79,8 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
+%% (rabbit_misc:sort_field_table) that publish/1 and
+%% rabbit_binding:{add,remove}/5 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 553faaa8..cb53185f 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -39,16 +39,9 @@
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs, Protocol) ->
- Parent = self(),
- {ok, spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of
- %% the channel or reader process terminates us too.
- process_flag(trap_exit, true),
- {ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(Parent, ChannelPid, Protocol)
- end)}.
+start_link(Parent, ChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -62,12 +55,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ab50c28c..a9945af1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,16 +31,26 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+ pause_monitor/1, resume_monitor/1]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([heartbeaters/0]).
+
-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- heartbeaters()).
+-spec(start_heartbeat_sender/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -48,27 +58,26 @@
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
- none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent),
+ heartbeater(
+ {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(
+ Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}).
+
+start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent),
- {Sender, Receiver}.
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ Parent ! timeout,
+ stop
+ end}).
pause_monitor(none) ->
ok;
@@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
-heartbeater(Params, Parent) ->
- spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent),
- {0, 0})
- end).
+heartbeater(Params) ->
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- MonitorRef, {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
pause ->
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
resume ->
Recurse({0, 0});
Other ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9894a850..da7078f1 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/2, shutdown/1]).
+-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -47,7 +47,6 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) ->
rabbit_types:ok_pid_or_error()).
--spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
@@ -77,17 +76,10 @@
start_link(ChPid, UnackedMsgCount) ->
gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
-shutdown(undefined) ->
- ok;
-shutdown(LimiterPid) ->
- true = unlink(LimiterPid),
- gen_server2:cast(LimiterPid, shutdown).
-
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, {limit, PrefetchCount})).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -125,8 +117,7 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, unblock, infinity)).
+ gen_server2:call(LimiterPid, unblock, infinity).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -165,9 +156,6 @@ handle_call(unblock, _From, State) ->
{stop, State1} -> {stop, normal, stopped, State1}
end.
-handle_cast(shutdown, State) ->
- {stop, normal, State};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
@@ -247,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
-
-unlink_on_stopped(LimiterPid, stopped) ->
- ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
- stopped;
-unlink_on_stopped(_LimiterPid, Result) ->
- Result.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5fa3f8ed..086d260e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -39,7 +39,6 @@
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
--export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -108,10 +107,6 @@
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
'ok' | rabbit_types:connection_exit()).
--spec(get_config/1 ::
- (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
--spec(get_config/2 :: (atom(), A) -> A).
--spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
@@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
[Key, rabbit_misc:rs(Name), New1, Orig1])
end.
-get_config(Key) ->
- case dirty_read({rabbit_config, Key}) of
- {ok, {rabbit_config, Key, V}} -> {ok, V};
- Other -> Other
- end.
-
-get_config(Key, DefaultValue) ->
- case get_config(Key) of
- {ok, V} -> V;
- {error, not_found} -> DefaultValue
- end.
-
-set_config(Key, Value) ->
- ok = mnesia:dirty_write({rabbit_config, Key, Value}).
-
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 37708b22..8136c613 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -169,10 +169,6 @@ table_definitions() ->
{attributes, record_info(fields, vhost)},
{disc_copies, [node()]},
{match, #vhost{_='_'}}]},
- {rabbit_config,
- [{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]},
- {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6576bfbb..a9c7db76 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/1,
+ sync/3, client_init/2, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -136,7 +136,7 @@
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), binary()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
@@ -373,13 +373,13 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState, Server) ->
close_all_handles(CState),
- ok.
+ ok = gen_server2:call(Server, client_terminate, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
- ok = client_terminate(CState),
- ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
@@ -606,10 +606,8 @@ handle_call({new_client_state, CRef}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({delete_client, CRef}, _From,
- State = #msstate { client_refs = ClientRefs }) ->
- reply(ok,
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+handle_call(client_terminate, _From, State) ->
+ reply(ok, State).
handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
@@ -724,7 +722,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
handle_info(timeout, State) ->
noreply(internal_sync(State));
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 3facef17..c7a5a600 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -93,7 +93,14 @@ usage() ->
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
application:load(rabbit),
- NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
+ {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts(
+ getenv("RABBITMQ_NODENAME")),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ throw({cannot_connect_to_epmd, NodeHost, EpmdReason});
+ {ok, _} ->
+ ok
+ end,
{NodePids, Running} =
case list_to_integer(NodeCount) of
1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f968b0d8..08272afe 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -107,7 +107,15 @@ boot_ssl() ->
ok;
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
- {ok, SslOpts} = application:get_env(ssl_options),
+ {ok, SslOptsConfig} = application:get_env(ssl_options),
+ SslOpts =
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end,
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end.
@@ -118,7 +126,7 @@ start() ->
{rabbit_tcp_client_sup,
{tcp_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
- {rabbit_reader,start_link,[]}]},
+ {rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
@@ -204,10 +212,10 @@ on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
start_client(Sock, SockTransform) ->
- {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock, SockTransform},
- Child.
+ {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -230,8 +238,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, Pid, _, _} <- supervisor:which_children(
- rabbit_tcp_client_sup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index c9f75be0..b23776cd 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,7 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ..."),
+ io:format("Activating RabbitMQ plugins ...~n"),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -77,7 +77,7 @@ start() ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
+ RabbitVersion = proplists:get_value(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,
@@ -130,8 +130,9 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~w~n", [App]) || App <- PluginApps],
+ io:format("~w plugins activated:~n", [length(PluginApps)]),
+ [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
+ || App <- PluginApps],
io:nl(),
halt(),
ok.
@@ -150,29 +151,33 @@ determine_version(App) ->
{ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
-assert_dir(Dir) ->
- case filelib:is_dir(Dir) of
- true -> ok;
- false -> ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
- end.
-
-delete_dir(Dir) ->
- case filelib:is_dir(Dir) of
+delete_recursively(Fn) ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
true ->
- case file:list_dir(Dir) of
+ case file:list_dir(Fn) of
{ok, Files} ->
- [case Dir ++ "/" ++ F of
- Fn ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true -> delete_dir(Fn);
- false -> file:delete(Fn)
- end
- end || F <- Files]
- end,
- ok = file:del_dir(Dir);
+ case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
+ Fn ++ "/" ++ Fn1);
+ (_Fn1, Err) -> Err
+ end, ok, Files) of
+ ok -> case file:del_dir(Fn) of
+ ok -> ok;
+ {error, E} -> {error,
+ {cannot_delete, Fn, E}}
+ end;
+ Err -> Err
+ end;
+ {error, E} ->
+ {error, {cannot_list_files, Fn, E}}
+ end;
false ->
- ok
+ case filelib:is_file(Fn) of
+ true -> case file:delete(Fn) of
+ ok -> ok;
+ {error, E} -> {error, {cannot_delete, Fn, E}}
+ end;
+ false -> ok
+ end
end.
is_symlink(Name) ->
@@ -181,13 +186,18 @@ is_symlink(Name) ->
_ -> false
end.
-unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+unpack_ez_plugins(SrcDir, DestDir) ->
%% Eliminate the contents of the destination directory
- delete_dir(PluginDestDir),
-
- assert_dir(PluginDestDir),
- [unpack_ez_plugin(PluginName, PluginDestDir) ||
- PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+ [unpack_ez_plugin(PluginName, DestDir) ||
+ PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
unpack_ez_plugin(PluginFn, PluginDestDir) ->
zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
@@ -246,8 +256,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
- [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+ [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index b056d60b..0a49b94d 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -49,7 +49,6 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
--spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,9 +63,6 @@ register(CollectorPid, Q) ->
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
-shutdown(CollectorPid) ->
- gen_server:cast(CollectorPid, shutdown).
-
%%----------------------------------------------------------------------------
init([]) ->
@@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
-handle_cast(shutdown, State) ->
- {stop, normal, State}.
+handle_cast(Msg, State) ->
+ {stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a133bf45..a21961b5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/2]).
+-export([init/4, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -46,7 +46,6 @@
-export([emit_stats/1]).
-import(gen_tcp).
--import(fprof).
-import(inet).
-import(prim_inet).
@@ -60,7 +59,8 @@
%---------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
- connection_state, queue_collector, heartbeater, stats_timer}).
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid, start_heartbeat_fun}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -160,6 +160,12 @@
-ifdef(use_specs).
+-type(start_heartbeat_fun() ::
+ fun ((rabbit_networking:socket(), non_neg_integer()) ->
+ rabbit_heartbeat:heartbeaters())).
+
+-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+ rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -168,21 +174,33 @@
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(start_connection/7 ::
+ (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+ Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(
+ Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -208,33 +226,6 @@ info(Pid, Items) ->
emit_stats(Pid) ->
gen_server:cast(Pid, emit_stats).
-setup_profiling() ->
- Value = rabbit_misc:get_config(profiling_enabled, false),
- case Value of
- once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
- rabbit_misc:set_config(profiling_enabled, false),
- fprof:trace(start);
- true ->
- rabbit_log:info("Enabling profiling for this connection.~n"),
- fprof:trace(start);
- false ->
- ok
- end,
- Value.
-
-teardown_profiling(Value) ->
- case Value of
- false ->
- ok;
- _ ->
- rabbit_log:info("Completing profiling for this connection.~n"),
- fprof:trace(stop),
- fprof:profile(),
- fprof:analyse([{dest, []}, {cols, 100}])
- end.
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -261,7 +252,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+ Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
- ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer()},
- handshake, 8))
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun
+ },
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue),
- rabbit_misc:unlink_and_capture_exit(Collector),
- rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_channel_exit(Channel, Reason, State));
- {'EXIT', Pid, Reason} ->
- mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
+ {channel_exit, ChannelOrFrPid, Reason} ->
+ mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
+ {'EXIT', ChSupPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,33 +433,45 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
- {channel, Channel} = get({chpid, ChPid}),
+handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
+ {channel, Channel} = get({ch_fr_pid, ChFrPid}),
handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
-handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+handle_dependent_exit(ChSupPid, Reason, State) ->
+ case termination_kind(Reason) of
+ controlled ->
+ case erase({ch_sup_pid, ChSupPid}) of
+ undefined -> ok;
+ {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
+ end,
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(ChSupPid) of
+ undefined ->
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
-channel_cleanup(Pid) ->
- case get({chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} -> erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+channel_cleanup(ChSupPid) ->
+ case get({ch_sup_pid, ChSupPid}) of
+ undefined -> undefined;
+ {{channel, Channel}, ChFr} -> erase({channel, Channel}),
+ erase(ChFr),
+ erase({ch_sup_pid, ChSupPid}),
+ Channel
end.
-all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
+all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
+ {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(ChFrPid)
+ || ChFrPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'EXIT', Pid, Reason} ->
- case channel_cleanup(Pid) of
+ {'EXIT', ChSupPid, Reason} ->
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
- case Reason of
- normal -> ok;
- _ ->
+ case termination_kind(Reason) of
+ controlled ->
+ ok;
+ uncontrolled ->
rabbit_log:error(
"connection ~p, channel ~p - "
"error while terminating:~n~p~n",
@@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
State.
+termination_kind(normal) -> controlled;
+termination_kind(shutdown) -> controlled;
+termination_kind({shutdown, _Term}) -> controlled;
+termination_kind(_) -> uncontrolled.
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
- {chpid, ChPid} ->
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
+ {ch_fr_pid, ChFrPid} ->
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
@@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock}) ->
+ sock = Sock,
+ start_heartbeat_fun = SHF}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
@@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = rabbit_heartbeat:start_heartbeat(
- Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -765,9 +773,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection}),
- rabbit_event:notify(
- connection_created,
- [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(connection_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -849,21 +856,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost,
- protocol = Protocol}} = State,
- {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
- {ok, ChPid} = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector],
- Protocol),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, ChSupPid, ChFrPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector}),
+ put({channel, Channel}, {ch_fr_pid, ChFrPid}),
+ put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
+ put({ch_fr_pid, ChFrPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -931,5 +938,4 @@ amqp_exception_explanation(Text, Expl) ->
end.
internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d7d6d0ad..d3396424 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -46,11 +46,17 @@
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(qpids() :: [pid()]).
-spec(deliver/2 ::
- ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ ([qpids()], rabbit_types:delivery()) -> {routing_result(), [qpids()]}).
-spec(deliver_by_queue_names/2 ::
- ([binary()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ ([binary()], rabbit_types:delivery()) -> {routing_result(), [qpids()]}).
+-spec(match_bindings/2 :: (rabbit_exchange:name(),
+ fun ((rabbit_types:binding()) -> boolean())) ->
+ qpids()).
+-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
+ qpids()).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1e7f533a..495976a6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,9 +35,6 @@
-export([all_tests/0, test_parsing/0]).
-%% Exported so the hook mechanism can call back
--export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
-
-import(lists).
-include("rabbit.hrl").
@@ -1092,7 +1089,8 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ <<"user">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1133,67 +1131,23 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
-test_hooks() ->
- %% Firing of hooks calls all hooks in an isolated manner
- rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:trigger(test_hook, [arg1, arg2]),
- [arg1, arg2] = get(test_hook_test_fired),
- [arg1, arg2] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Hook Deletion works
- put(test_hook_test_fired, undefined),
- put(test_hook_test2_fired, undefined),
- rabbit_hooks:unsubscribe(test_hook, test),
- rabbit_hooks:trigger(test_hook, [arg3, arg4]),
- undefined = get(test_hook_test_fired),
- [arg3, arg4] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Catches exceptions from bad hooks
- rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}),
- ok = rabbit_hooks:trigger(test_hook3, []),
-
- %% Passing extra arguments to hooks
- rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}),
- rabbit_hooks:trigger(arg_hook, [arg1, arg2]),
- {[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
-
- %% Invoking Pids
- Remote = fun () ->
- receive
- {rabbitmq_hook,[remote_test,test,[],Target]} ->
- Target ! invoked
- end
- end,
- P = spawn(Remote),
- rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
- rabbit_hooks:trigger(remote_test, []),
- receive
- invoked -> ok
- after 100 ->
- io:format("Remote hook not invoked"),
- throw(timeout)
- end,
- passed.
-
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
- <<"/">>, self()),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ <<"guest">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
test_statistics_receiver(Pid) ->
receive
@@ -1232,7 +1186,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
@@ -1476,14 +1430,6 @@ delete_log_handlers(Handlers) ->
Handler <- Handlers],
ok.
-handle_hook(HookName, Handler, Args) ->
- A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired",
- put(list_to_atom(A), Args).
-bad_handle_hook(_, _, _) ->
- exit(bad_handle_hook_called).
-extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
- handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
-
test_supervisor_delayed_restart() ->
test_sup:test_supervisor_delayed_restart().
@@ -1584,7 +1530,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1647,7 +1593,7 @@ test_msg_store() ->
ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1672,7 +1618,7 @@ test_msg_store() ->
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9)),
+ msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 47e8bb01..9dfd33bd 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -118,7 +118,8 @@
-type(binding() ::
#binding{exchange_name :: rabbit_exchange:name(),
queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_exchange:binding_key()}).
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee8..30d3a8ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -439,9 +439,10 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ _ -> rabbit_msg_store:client_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -464,8 +465,7 @@ delete_and_terminate(State) ->
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
- rabbit_msg_store:client_terminate(MSCStateP)
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
end,
rabbit_msg_store:client_delete_and_terminate(
MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f90ee734..aa986e54 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3, send_command_and_signal_back/3,
- send_command_and_signal_back/4, send_command_and_notify/5]).
+-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([send_command/2, send_command/3, send_command_sync/2,
+ send_command_sync/3, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
@@ -48,24 +48,23 @@
-ifdef(use_specs).
--spec(start/4 ::
+-spec(start/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start_link/4 ::
+-spec(start_link/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
--spec(send_command_and_signal_back/3 ::
- (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
--spec(send_command_and_signal_back/4 ::
- (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
- -> 'ok').
+-spec(send_command_sync/2 ::
+ (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command_sync/3 ::
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -84,68 +83,61 @@
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}])}.
-
-start_link(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
-mainloop(State) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+mainloop(ReaderPid, State) ->
+ try
+ mainloop1(ReaderPid, State)
+ catch
+ exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ end,
+ done.
+
+mainloop1(ReaderPid, State) ->
receive
- Message -> ?MODULE:mainloop(handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
-handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({send_command, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
State;
-handle_message({send_command, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({send_command, MethodRecord, Content}, State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ gen_server:reply(From, ok),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
+ gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -159,29 +151,28 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
-send_command_and_signal_back(W, MethodRecord, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Parent},
- ok.
+send_command_sync(W, MethodRecord) ->
+ call(W, {send_command_sync, MethodRecord}).
-send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
- ok.
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, {send_command_sync, MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- rabbit_misc:unlink_and_capture_exit(W),
- ok.
+%---------------------------------------------------------------------------
+
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
+ Res.
%---------------------------------------------------------------------------
assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
- Protocol).
+ rabbit_binary_generator:build_simple_method_frame(
+ Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
@@ -223,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+internal_send_command_async(MethodRecord,
+ #wstate{sock = Sock,
+ channel = Channel,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
- Protocol) ->
+internal_send_command_async(MethodRecord, Content,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
Content, FrameMax, Protocol)),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 87883037..4a1c5832 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -63,7 +63,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -138,6 +138,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name) ->
+ [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
+ Name1 =:= Name].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index 1b785843..02d7e0e4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -31,19 +31,19 @@
-module(tcp_client_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
- supervisor:start_link(?MODULE, Callback).
+ supervisor2:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
- supervisor:start_link(SupName, ?MODULE, Callback).
+ supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{tcp_client, {M,F,A},
- temporary, brutal_kill, worker, [M]}]}}.
+ temporary, infinity, supervisor, [M]}]}}.