summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-21 13:41:15 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-21 13:41:15 +0000
commitc5b7be1527eb01264b4037e18c70bb3589e5e5fc (patch)
treed63e4f6a3834d83f8a000acaec3ceac2c56da7bc
parentc112d3cfc8aa423f1bb729b6cf95579008c116cb (diff)
parentf6c6e6130bdcdaf82b351b84eb070f44b37356e9 (diff)
downloadrabbitmq-server-c5b7be1527eb01264b4037e18c70bb3589e5e5fc.tar.gz
Merge in default
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_exchange.erl36
-rw-r--r--src/rabbit_exchange_decorator.erl12
-rw-r--r--src/rabbit_node_monitor.erl5
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_registry.erl39
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_vhost.erl2
9 files changed, 81 insertions, 42 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bff762f3..c6a8bf2f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -168,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5af112e2..79a71b8d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -395,6 +395,8 @@ terminate(Reason, State) ->
_ -> ok
end,
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5f4fb9ec..9e98448d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -326,34 +326,34 @@ route(#exchange{name = #resource{virtual_host = VHost,
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
{Decorators, _} ->
- QNames = route1(Delivery, {[X], XName, []}),
- lists:usort(decorate_route(Decorators, X, Delivery, QNames))
+ lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
end.
-decorate_route([], _X, _Delivery, QNames) ->
+route1(_, _, {[], _, QNames}) ->
QNames;
-decorate_route(Decorators, X, Delivery, QNames) ->
- QNames ++
- lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
-
-route1(_, {[], _, QNames}) ->
- QNames;
-route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
+route1(Delivery, Decorators,
+ {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ ExchangeDests = (type_to_module(Type)):route(X, Delivery),
+ DecorateDests = process_decorators(X, Decorators, Delivery),
+ AlternateDests = process_alternate(X, ExchangeDests),
+ route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
- DstNames)).
+ AlternateDests ++ DecorateDests ++ ExchangeDests)).
-process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation
+ [];
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
AName -> [AName]
end;
-process_alternate(_X, Results) ->
- Results.
+process_alternate(_X, _Results) ->
+ [].
+
+process_decorators(_, [], _) -> %% optimisation
+ [];
+process_decorators(X, Decorators, Delivery) ->
+ lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
process_route(#resource{kind = exchange} = XName,
{_WorkList, XName, _QNames} = Acc) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 8f17adfc..040b55db 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -57,12 +57,10 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called after exchange routing
-%% return value is a list of queues to be added to the list of
-%% destination queues. decorators must register separately for
-%% this callback using exchange_decorator_route.
--callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
- [rabbit_amqqueue:name()].
+%% Decorators can optionally implement route/2 which allows additional
+%% destinations to be added to the routing decision.
+%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
-else.
@@ -70,7 +68,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index de53b7f0..3872f3df 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -318,6 +318,9 @@ alive_nodes() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
[N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_rabbit_nodes() ->
+ [N || N <- alive_nodes(), rabbit_nodes:is_running(N, rabbit)].
+
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
@@ -346,7 +349,7 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
%% that we do not attempt to deal with individual (other) partitions
%% going away. It's only safe to forget anything about partitions when
%% there are no partitions.
- Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of
+ Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of
[] -> [];
_ -> Partitions
end,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index aaaa179a..61fac0e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -306,8 +306,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
+ maybe_emit_stats(State),
throw(connection_closed_abruptly);
{error, Reason} ->
+ maybe_emit_stats(State),
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
@@ -338,23 +340,28 @@ handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
throw(E);
handle_other({channel_exit, Channel, Reason}, State) ->
handle_exception(State, Channel, Reason);
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
handle_dependent_exit(ChPid, Reason, State);
-handle_other(terminate_connection, _State) ->
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
stop;
handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
State;
handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
-handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
@@ -386,8 +393,9 @@ handle_other(emit_stats, State) ->
handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
-handle_other(Other, _State) ->
+handle_other(Other, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
@@ -850,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -1010,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3514e780..acdc2cff 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
internal_register(Class, TypeName, ModuleName)
when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(class_module(Class), ModuleName),
- true = ets:insert(?ETS_NAME,
- {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
+ RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName},
+ true = ets:insert(?ETS_NAME, RegArg),
+ conditional_register(RegArg),
ok.
internal_unregister(Class, TypeName) ->
- true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ UnregArg = {Class, internal_binary_to_type(TypeName)},
+ conditional_unregister(UnregArg),
+ true = ets:delete(?ETS_NAME, UnregArg),
+ ok.
+
+%% register exchange decorator route callback only when implemented,
+%% in order to avoid unnecessary decorator calls on the fast
+%% publishing path
+conditional_register({{exchange_decorator, Type}, ModuleName}) ->
+ case erlang:function_exported(ModuleName, route, 2) of
+ true -> true = ets:insert(?ETS_NAME,
+ {{exchange_decorator_route, Type},
+ ModuleName});
+ false -> ok
+ end;
+conditional_register(_) ->
+ ok.
+
+conditional_unregister({exchange_decorator, Type}) ->
+ true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}),
+ ok;
+conditional_unregister(_) ->
ok.
sanity_check_module(ClassModule, Module) ->
@@ -104,12 +126,11 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(exchange_decorator_route) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7ecea035..44b7fc4a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1600,7 +1600,7 @@ control_action(Command, Node, Args, Opts) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, []);
+ if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index d0f39221..2858cf58 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -70,6 +70,7 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
+ rabbit_event:notify(vhost_created, info(VHostPath)),
R.
delete(VHostPath) ->
@@ -87,6 +88,7 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
internal_delete(VHostPath) ->