summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-18 18:28:16 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-18 18:28:16 +0000
commit0a841c886b0941b534de7f5fb32405e910c44173 (patch)
treee05b88142fb94f8c96aa2797d46cae2968782dcf
parentd8c2900d40317202aa509ef18116c7058ddc7f16 (diff)
downloadrabbitmq-server-0a841c886b0941b534de7f5fb32405e910c44173.tar.gz
Set up the monitor only when we see the basic.consume_ok coming back through otherwise there's a risk of sending out the cancel before we've sent out the consume_ok, which would surprise clients. This is further complicated by the fact NoWait with basic.consume...
-rw-r--r--src/rabbit_channel.erl74
1 files changed, 48 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ff8ff800..b8788983 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -241,6 +241,11 @@ handle_cast({flushed, QPid}, State) ->
handle_cast(terminate, State) ->
{stop, normal, State};
+handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Msg),
+ noreply(monitor_consumer(ConsumerTag, State));
+
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
@@ -656,9 +661,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{reader_pid = ReaderPid,
limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -682,23 +685,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
- {ConsumerMonitors1, MRef} =
- case rabbit_misc:table_lookup(
- Capabilities,
- <<"consumer_death_notification">>) of
- {bool, true} ->
- MRef1 = erlang:monitor(process, QPid),
- {dict:store(MRef1, ActualConsumerTag,
- ConsumerMonitors), MRef1};
- _ ->
- {ConsumerMonitors, undefined}
- end,
- {noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, MRef},
- ConsumerMapping),
- consumer_monitors = ConsumerMonitors1}};
+ {ok, Q} ->
+ State1 = State#ch{consumer_mapping =
+ dict:store(ActualConsumerTag,
+ {Q, undefined},
+ ConsumerMapping)},
+ {noreply,
+ maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)};
{{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -734,10 +727,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% the queue process to send the cancel_ok on our
%% behalf. If we were sending the cancel_ok ourselves it
%% might overtake a message sent previously by the queue.
- case rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag,
- ok_msg(NoWait, #'basic.cancel_ok'{
- consumer_tag = ConsumerTag})) of
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
+ rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag,
+ ok_msg(NoWait, #'basic.cancel_ok'{
+ consumer_tag = ConsumerTag}))
+ end) of
ok ->
{noreply, NewState};
{error, not_found} ->
@@ -1062,6 +1059,28 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+maybe_monitor_consumer(true, ConsumerTag, State) ->
+ monitor_consumer(ConsumerTag, State);
+maybe_monitor_consumer(false, _ConsumerTag, State) ->
+ State.
+
+monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
+ case {dict:find(ConsumerTag, ConsumerMapping),
+ rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_death_notification">>)} of
+ {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} ->
+ MRef = erlang:monitor(process, QPid),
+ State#ch{consumer_mapping =
+ dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
+ consumer_monitors =
+ dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ _X ->
+ %% either already received the cancel or incapable client
+ State
+ end.
+
handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1080,13 +1099,16 @@ handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
queue_blocked(QPid, State3).
handle_consumer_down(MRef, ConsumerTag,
- State = #ch{consumer_monitors = ConsumerMonitors,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
writer_pid = WriterPid}) ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
nowait = true},
ok = rabbit_writer:send_command(WriterPid, Cancel),
- State#ch{consumer_monitors = ConsumerMonitors1}.
+ State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,