summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-26 12:17:13 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-26 12:17:13 +0000
commit0347d6b46842cbe037c6b9129524668e093926c4 (patch)
treef319ab61aa37aca923b0c49fbfad6b2ff7aace67
parent5fec171569615567a830cee2493932df37596c52 (diff)
parentf2e1729521131549b6c2f0d12106f299b4a1901a (diff)
downloadrabbitmq-server-0347d6b46842cbe037c6b9129524668e093926c4.tar.gz
Merge bug26069
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl176
-rw-r--r--src/rabbit_disk_monitor.erl2
-rw-r--r--src/rabbit_prelaunch.erl11
4 files changed, 114 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 85d1f283..757f18ac 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -436,7 +436,8 @@ declare_args() ->
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2}].
-consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
+ {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -444,6 +445,9 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_bool_arg({bool, _}, _) -> ok;
+check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 56a3cbb6..b9b39ac3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -755,9 +755,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_prefetch = ConsumerPrefetchCount,
+ _, State = #ch{consumer_prefetch = ConsumerPrefetch,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -769,34 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
"amq.ctag");
Other -> Other
end,
-
- %% We get the queue process to send the consume_ok on our
- %% behalf. This is for symmetry with basic.cancel - see
- %% the comment in that method for why.
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) ->
- {rabbit_amqqueue:basic_consume(
- Q, NoAck, self(),
- rabbit_limiter:pid(Limiter),
- rabbit_limiter:is_active(Limiter),
- ConsumerPrefetchCount,
- ActualConsumerTag, ExclusiveConsume, Args,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag})),
- Q}
- end) of
- {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
- CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
- State1 = monitor_delivering_queue(
- NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1}),
- {noreply,
- case NoWait of
- true -> consumer_monitor(ActualConsumerTag, State1);
- false -> State1
- end};
- {{error, exclusive_consume_unavailable}, _Q} ->
+ case basic_consume(
+ QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -815,7 +791,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
QCons1 =
case dict:find(QPid, QCons) of
@@ -1174,10 +1150,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
drain = Drain},
_, State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
- {ok, Q} -> ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain),
- {noreply, State};
- error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed(
+ "unknown consumer tag '~s'", [CTag])
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1186,26 +1163,55 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+%% We get the queue process to send the consume_ok on our behalf. This
+%% is for symmetry with basic.cancel - see the comment in that method
+%% for why.
+basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait,
+ State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) ->
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
+ end) of
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
+ CM1 = dict:store(
+ ActualConsumerTag,
+ {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
+ ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
+ {ok, case NoWait of
+ true -> consumer_monitor(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable} = E, _Q} ->
+ E
+ end.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
- queue_consumers = QCons,
- capabilities = Capabilities}) ->
- case rabbit_misc:table_lookup(
- Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} ->
- #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- QCons1 = dict:update(QPid,
- fun (CTags) ->
- gb_sets:insert(ConsumerTag, CTags)
- end,
- gb_sets:singleton(ConsumerTag),
- QCons),
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
- queue_consumers = QCons1};
- _ ->
- State
- end.
+ queue_consumers = QCons}) ->
+ {#amqqueue{pid = QPid}, _CParams} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid, fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag), QCons),
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1}.
monitor_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
@@ -1231,28 +1237,52 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
end.
-handle_consuming_queue_down(QPid,
- State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- queue_names = QNames}) ->
+handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
- ConsumerMapping1 =
- gb_sets:fold(fun (CTag, CMap) ->
- ok = send(#'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- State),
- rabbit_event:notify(
- consumer_deleted,
- [{consumer_tag, CTag},
- {channel, self()},
- {queue, dict:fetch(QPid, QNames)}]),
- dict:erase(CTag, CMap)
- end, ConsumerMapping, ConsumerTags),
- State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = dict:erase(QPid, QCons)}.
+ gb_sets:fold(
+ fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
+ QName = dict:fetch(QPid, QNames),
+ case queue_down_consumer_action(CTag, CMap) of
+ remove ->
+ cancel_consumer(CTag, QName, StateN);
+ {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
+ case catch basic_consume( %% [0]
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN) of
+ {ok, StateN1} -> StateN1;
+ _ -> cancel_consumer(CTag, QName, StateN)
+ end
+ end
+ end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
+
+%% [0] There is a slight danger here that if a queue is deleted and
+%% then recreated again the reconsume will succeed even though it was
+%% not an HA failover. But the likelihood is not great and most users
+%% are unlikely to care.
+
+cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
+ consumer_mapping = CMap}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true}, State);
+ _ -> ok
+ end,
+ rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag},
+ {channel, self()},
+ {queue, QName}]),
+ State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
+
+queue_down_consumer_action(CTag, CMap) ->
+ {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
+ case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of
+ {bool, true} -> remove;
+ _ -> {recover, ConsumeSpec}
+ end.
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
@@ -1427,8 +1457,8 @@ foreach_per_queue(F, UAL) ->
rabbit_misc:gb_trees_foreach(F, T).
consumer_queues(Consumers) ->
- lists:usort([QPid ||
- {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
+ lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9c29646..4a2337b9 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -186,7 +186,7 @@ get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
- parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ [$"]));
+ parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ "\""));
get_disk_free(_, Platform) ->
{unknown, Platform}.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 4037ed44..5e2ef1e8 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -44,7 +44,7 @@ start() ->
[NodeStr] ->
Node = rabbit_nodes:make(NodeStr),
{NodeName, NodeHost} = rabbit_nodes:parts(Node),
- ok = duplicate_node_check(Node, NodeName, NodeHost),
+ ok = duplicate_node_check(NodeName, NodeHost),
ok = dist_port_set_check(),
ok = dist_port_use_check(NodeHost);
[] ->
@@ -61,14 +61,13 @@ stop() ->
%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
-duplicate_node_check(Node, NodeName, NodeHost) ->
+duplicate_node_check(NodeName, NodeHost) ->
case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("ERROR: node with name ~p "
- "already running on ~p~n",
- [NodeName, NodeHost]),
- io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
+ true -> io:format(
+ "ERROR: node with name ~p already running on ~p~n",
+ [NodeName, NodeHost]),
rabbit_misc:quit(?ERROR_CODE);
false -> ok
end;