summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-09 14:35:25 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-09 14:35:25 +0100
commit0ec26e939b49cf7a59c2a0acb0388f37c052eba3 (patch)
tree13afc7d8ff8d140cb00ada104b190c5300310ec8
parent66dc4d62ed98f7391980a53c3d10baec9b9b43b2 (diff)
parent9a42294852e7ed823eb90ebd726fb58760cc5bae (diff)
downloadrabbitmq-server-0ec26e939b49cf7a59c2a0acb0388f37c052eba3.tar.gz
stable to default
-rw-r--r--docs/rabbitmq.config.example14
-rw-r--r--include/rabbit.hrl3
-rwxr-xr-xquickcheck3
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl20
-rw-r--r--src/rabbit_mirror_queue_misc.erl49
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl31
-rw-r--r--src/rabbit_policies.erl4
11 files changed, 100 insertions, 61 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index b0e13b1b..26de71b7 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -257,9 +257,13 @@
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
+ %% One of 'basic', 'detailed' or 'none'. See
+ %% http://www.rabbitmq.com/management.html#fine-stats for more details.
+ %% {rates_mode, basic},
+
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
- %% https://www.rabbitmq.com/management.html#configuration for more
+ %% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
@@ -268,14 +272,6 @@
%% {detailed, [{10, 5}]}]}
]},
- {rabbitmq_management_agent,
- [%% Misc/Advanced Options
- %%
- %% NB: Change these only if you understand what you are doing!
- %%
- %% {force_fine_statistics, true}
- ]},
-
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5ac3197e..ee1da6b2 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -105,9 +105,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/quickcheck b/quickcheck
index b5382d75..40f13091 100755
--- a/quickcheck
+++ b/quickcheck
@@ -17,7 +17,8 @@ main([NodeStr, ModStr, TrialsStr]) ->
case rpc:call(Node, proper, module,
[Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
[] -> ok;
- _ -> quit(1)
+ R -> io:format("~p.~n", [R]),
+ quit(1)
end;
{badrpc, Reason} ->
io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303..753d8e15 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -385,12 +385,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
+ case rabbit_misc:cancel_timer(TRef) of
false -> State;
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbf..db9349ac 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e687..728bc431 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b911..24b22d4c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9..7aec1ac8 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -374,16 +377,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79c..cc06ae44 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_timeout(State = #state { backing_queue = BQ }) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+backing_queue_timeout(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{backing_queue_state = BQ:timeout(BQS)}.
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f..cff16040 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,7 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +94,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -245,6 +246,8 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
@@ -1012,7 +1015,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1040,7 +1042,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1048,12 +1050,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f..3558cf98 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};