diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-18 18:28:16 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-18 18:28:16 +0000 |
commit | 0a841c886b0941b534de7f5fb32405e910c44173 (patch) | |
tree | e05b88142fb94f8c96aa2797d46cae2968782dcf | |
parent | d8c2900d40317202aa509ef18116c7058ddc7f16 (diff) | |
download | rabbitmq-server-0a841c886b0941b534de7f5fb32405e910c44173.tar.gz |
Set up the monitor only when we see the basic.consume_ok coming back through otherwise there's a risk of sending out the cancel before we've sent out the consume_ok, which would surprise clients. This is further complicated by the fact NoWait with basic.consume...
-rw-r--r-- | src/rabbit_channel.erl | 74 |
1 files changed, 48 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ff8ff800..b8788983 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -241,6 +241,11 @@ handle_cast({flushed, QPid}, State) -> handle_cast(terminate, State) -> {stop, normal, State}; +handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, + State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Msg), + noreply(monitor_consumer(ConsumerTag, State)); + handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); @@ -656,9 +661,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{reader_pid = ReaderPid, limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -682,23 +685,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q = #amqqueue{pid = QPid}} -> - {ConsumerMonitors1, MRef} = - case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_death_notification">>) of - {bool, true} -> - MRef1 = erlang:monitor(process, QPid), - {dict:store(MRef1, ActualConsumerTag, - ConsumerMonitors), MRef1}; - _ -> - {ConsumerMonitors, undefined} - end, - {noreply, State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - {Q, MRef}, - ConsumerMapping), - consumer_monitors = ConsumerMonitors1}}; + {ok, Q} -> + State1 = State#ch{consumer_mapping = + dict:store(ActualConsumerTag, + {Q, undefined}, + ConsumerMapping)}, + {noreply, + maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)}; {{error, exclusive_consume_unavailable}, _Q} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -734,10 +727,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% the queue process to send the cancel_ok on our %% behalf. If we were sending the cancel_ok ourselves it %% might overtake a message sent previously by the queue. - case rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, - ok_msg(NoWait, #'basic.cancel_ok'{ - consumer_tag = ConsumerTag})) of + case rabbit_misc:with_exit_handler( + fun () -> {error, not_found} end, + fun () -> + rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, + ok_msg(NoWait, #'basic.cancel_ok'{ + consumer_tag = ConsumerTag})) + end) of ok -> {noreply, NewState}; {error, not_found} -> @@ -1062,6 +1059,28 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +maybe_monitor_consumer(true, ConsumerTag, State) -> + monitor_consumer(ConsumerTag, State); +maybe_monitor_consumer(false, _ConsumerTag, State) -> + State. + +monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + capabilities = Capabilities}) -> + case {dict:find(ConsumerTag, ConsumerMapping), + rabbit_misc:table_lookup( + Capabilities, <<"consumer_death_notification">>)} of + {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} -> + MRef = erlang:monitor(process, QPid), + State#ch{consumer_mapping = + dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), + consumer_monitors = + dict:store(MRef, ConsumerTag, ConsumerMonitors)}; + _X -> + %% either already received the cancel or incapable client + State + end. + handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1080,13 +1099,16 @@ handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> queue_blocked(QPid, State3). handle_consumer_down(MRef, ConsumerTag, - State = #ch{consumer_monitors = ConsumerMonitors, + State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, writer_pid = WriterPid}) -> + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, nowait = true}, ok = rabbit_writer:send_command(WriterPid, Cancel), - State#ch{consumer_monitors = ConsumerMonitors1}. + State#ch{consumer_mapping = ConsumerMapping1, + consumer_monitors = ConsumerMonitors1}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, |