summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-07 14:44:57 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-07 14:44:57 +0000
commit97e5e2486f86a1a1d4ff7ec948def5fbbcb1c058 (patch)
treec26fc611cbf622cd31bdad06030fcff9483c4771
parent76be3e0ad26de2a00a061fe4dfedf26fb1fa40a6 (diff)
parent55494a8fe0850e22c57609e41f6c525a80064991 (diff)
downloadrabbitmq-server-97e5e2486f86a1a1d4ff7ec948def5fbbcb1c058.tar.gz
Merging default into bug21647
-rw-r--r--src/rabbit_channel.erl166
-rw-r--r--src/rabbit_reader.erl3
2 files changed, 109 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 526fb428..b17f7a01 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
- confirmed, capabilities}).
+ consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
+ unconfirmed_qm, confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -176,6 +176,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -247,6 +248,11 @@ handle_cast(ready_for_close, State = #ch{state = closing,
handle_cast(terminate, State) ->
{stop, normal, State};
+handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Msg),
+ noreply(monitor_consumer(ConsumerTag, State));
+
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
@@ -288,23 +294,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
- erase_queue_stats(QPid),
- State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
- end)(MXs, State2),
- noreply(queue_blocked(QPid, State3)).
+handle_info({'DOWN', MRef, process, QPid, Reason},
+ State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ noreply(
+ case dict:find(MRef, ConsumerMonitors) of
+ error ->
+ handle_publishing_queue_down(QPid, Reason, State);
+ {ok, ConsumerTag} ->
+ handle_consuming_queue_down(MRef, ConsumerTag, State)
+ end).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -688,9 +686,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -707,18 +705,24 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
- rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
- ActualConsumerTag, ExclusiveConsume,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag}))
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(), LimiterPid,
+ ActualConsumerTag, ExclusiveConsume,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
end) of
- ok ->
- {noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
- {error, exclusive_consume_unavailable} ->
+ {ok, Q} ->
+ State1 = State#ch{consumer_mapping =
+ dict:store(ActualConsumerTag,
+ {Q, undefined},
+ ConsumerMapping)},
+ {noreply,
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -731,26 +735,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, QueueName} ->
- NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- %% In order to ensure that no more messages
- %% are sent to the consumer after the
- %% cancel_ok has been sent, we get the
- %% queue process to send the cancel_ok on
- %% our behalf. If we were sending the
- %% cancel_ok ourselves it might overtake a
- %% message sent previously by the queue.
+ {ok, {Q, MRef}} ->
+ ConsumerMonitors1 =
+ case MRef of
+ undefined -> ConsumerMonitors;
+ _ -> true = erlang:demonitor(MRef),
+ dict:erase(MRef, ConsumerMonitors)
+ end,
+ NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1},
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
rabbit_amqqueue:basic_cancel(
Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
@@ -1080,6 +1089,52 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
+ {#amqqueue{pid = QPid} = Q, undefined} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = erlang:monitor(process, QPid),
+ State#ch{consumer_mapping =
+ dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
+ consumer_monitors =
+ dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ _ ->
+ State
+ end.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
+ erase_queue_stats(QPid),
+ State3 = (case Reason of
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
+ end)(MXs, State2),
+ queue_blocked(QPid, State3).
+
+handle_consuming_queue_down(MRef, ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ writer_pid = WriterPid}) ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1250,16 +1305,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end].
+ lists:usort([QPid ||
+ {_Key, {#amqqueue{pid = QPid}, _MRef}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 710e6878..320ce9f8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -165,7 +165,8 @@ server_properties(Protocol) ->
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true}];
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].