diff options
-rw-r--r-- | docs/examples-to-end.xsl | 7 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 3 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 3 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 194 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 29 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 342 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 32 |
8 files changed, 343 insertions, 309 deletions
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index a0a74178..4db1d5c4 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -2,7 +2,10 @@ <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version='1.0'> -<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> +<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" + doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" + indent="yes" +/> <!-- Don't copy examples through in place --> <xsl:template match="*[@role='example-prefix']"/> @@ -27,7 +30,7 @@ <varlistentry> <term><command><xsl:copy-of select="text()"/></command></term> <listitem> - <xsl:copy-of select="following-sibling::para[@role='example']"/> + <xsl:copy-of select="following-sibling::para[@role='example' and preceding-sibling::screen[1] = current()]"/> </listitem> </varlistentry> </xsl:for-each> diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 3049f22d..56bed435 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -126,16 +126,15 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
--s rabbit ^
+W w ^
+A30 ^
+P 1048576 ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index db9b958e..26c6ea65 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -192,16 +192,15 @@ set ERLANG_SERVICE_ARGUMENTS= ^ !RABBITMQ_EBIN_PATH! ^
-boot "!RABBITMQ_BOOT_FILE!" ^
!RABBITMQ_CONFIG_ARG! ^
--s rabbit ^
+W w ^
+A30 ^
+P 1048576 ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
--rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 776ac43a..e14dfe22 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -159,7 +159,8 @@ -define(FILE_HANDLES_CHECK_INTERVAL, 2000). -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). --define(CLIENT_ETS_TABLE, ?MODULE). +-define(CLIENT_ETS_TABLE, file_handle_cache_client). +-define(ELDERS_ETS_TABLE, file_handle_cache_elders). %%---------------------------------------------------------------------------- @@ -802,7 +803,8 @@ init([]) -> error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), - {ok, #fhc_state { elders = dict:new(), + Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]), + {ok, #fhc_state { elders = Elders, limit = Limit, open_count = 0, open_pending = pending_new(), @@ -818,28 +820,27 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, elders = Elders, clients = Clients }) when EldestUnusedSince =/= undefined -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + true = ets:insert(Elders, {Pid, EldestUnusedSince}), Item = #pending { kind = open, pid = Pid, requested = Requested, from = From }, ok = track_client(Pid, Clients), - State1 = State #fhc_state { elders = Elders1 }, - case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + case needs_reduce(State #fhc_state { open_count = Count + Requested }) of true -> case ets:lookup(Clients, Pid) of [#cstate { opened = 0 }] -> true = ets:update_element( Clients, Pid, {#cstate.blocked, true}), {noreply, - reduce(State1 #fhc_state { + reduce(State #fhc_state { open_pending = pending_in(Item, Pending) })}; [#cstate { opened = Opened }] -> true = ets:update_element( Clients, Pid, {#cstate.pending_closes, Opened}), - {reply, close, State1} + {reply, close, State} end; - false -> {noreply, run_pending_item(Item, State1)} + false -> {noreply, run_pending_item(Item, State)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, @@ -887,21 +888,20 @@ handle_cast({register_callback, Pid, MFA}, handle_cast({update, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders }) when EldestUnusedSince =/= undefined -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + true = ets:insert(Elders, {Pid, EldestUnusedSince}), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, State #fhc_state { elders = Elders1 }}; + {noreply, State}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> - Elders1 = case EldestUnusedSince of - undefined -> dict:erase(Pid, Elders); - _ -> dict:store(Pid, EldestUnusedSince, Elders) - end, + true = case EldestUnusedSince of + undefined -> ets:delete(Elders, Pid); + _ -> ets:insert(Elders, {Pid, EldestUnusedSince}) + end, ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, adjust_alarm(State, process_pending( - update_counts(open, Pid, -1, - State #fhc_state { elders = Elders1 })))}; + update_counts(open, Pid, -1, State)))}; handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), @@ -922,6 +922,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, [#cstate { opened = Opened, obtained = Obtained }] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), + true = ets:delete(Elders, Pid), FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, adjust_alarm( State, @@ -930,11 +931,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, open_count = OpenCount - Opened, open_pending = filter_pending(FilterFun, OpenPending), obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending), - elders = dict:erase(Pid, Elders) }))}. + obtain_pending = filter_pending(FilterFun, ObtainPending) }))}. -terminate(_Reason, State = #fhc_state { clients = Clients }) -> +terminate(_Reason, State = #fhc_state { clients = Clients, + elders = Elders }) -> ets:delete(Clients), + ets:delete(Elders), State. code_change(_OldVsn, State, _Extra) -> @@ -1090,7 +1092,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, timer_ref = TRef }) -> Now = now(), {CStates, Sum, ClientCount} = - dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> [#cstate { pending_closes = PendingCloses, opened = Opened, blocked = Blocked } = CState] = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 734b2291..c28cd5bf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -56,11 +56,11 @@ -record(consumer, {tag, ack_required}). %% These are held in our process dictionary --record(cr, {consumer_count, - ch_pid, - limiter, +-record(cr, {ch_pid, monitor_ref, acktags, + consumer_count, + limiter, is_limit_active, unsent_message_count}). @@ -336,10 +336,10 @@ ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, + C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = sets:new(), + consumer_count = 0, is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -348,18 +348,18 @@ ch_record(ChPid) -> C = #cr{} -> C end. -store_ch_record(C = #cr{ch_pid = ChPid}) -> - put({ch, ChPid}, C). - -maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, - acktags = ChAckTags, - unsent_message_count = UnsentMessageCount}) -> +update_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + unsent_message_count = UnsentMessageCount}) -> case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of - {0, 0, 0} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true - end. + {0, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) + end, + C. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C), + ok. erase_ch_record(#cr{ch_pid = ChPid, limiter = Limiter, @@ -369,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> + ok = rabbit_limiter:register(Limiter, self()), + update_ch_record(C#cr{consumer_count = 1}); +update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> + ok = rabbit_limiter:unregister(Limiter, self()), + update_ch_record(C#cr{consumer_count = 0, + limiter = rabbit_limiter:make_token()}); +update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> + update_ch_record(C#cr{consumer_count = Count + Delta}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> @@ -406,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, true -> sets:add_element(AckTag, ChAckTags); false -> ChAckTags end, - NewC = C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}, - true = maybe_store_ch_record(NewC), + NewC = update_ch_record( + C#cr{unsent_message_count = Count + 1, + acktags = ChAckTags1}), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -426,7 +436,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - true = maybe_store_ch_record(C#cr{is_limit_active = true}), + update_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -560,11 +570,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> BQS1 = - BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS), + false -> Props = (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS1 = BQ:publish(Message, Props, ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -584,8 +592,8 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> - queue:filter(fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) + queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). remove_consumers(ChPid, Queue) -> @@ -608,8 +616,7 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - NewC = Update(C), - maybe_store_ch_record(NewC), + NewC = update_ch_record(Update(C)), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -648,11 +655,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> end end. -cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> - none; -cancel_holder(_ChPid, _ConsumerTag, Holder) -> - Holder. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -678,8 +680,15 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -subtract_acks(A, B) when is_list(B) -> - lists:foldl(fun sets:del_element/2, A, B). +subtract_acks(ChPid, AckTags, State, Fun) -> + case lookup_ch(ChPid) of + not_found -> + State; + C = #cr{acktags = ChAckTags} -> + update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2, + ChAckTags, AckTags)}), + Fun(State) + end. discard_delivery(#delivery{sender = ChPid, message = Message}, @@ -946,10 +955,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, State3 = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = - sets:add_element(AckTag, - ChAckTags)}), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + update_ch_record(C#cr{acktags = ChAckTags1}), State2; false -> State2 end, @@ -965,16 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + C = ch_record(ChPid), + C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record( - C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), - ok = case ConsumerCount of - 0 -> rabbit_limiter:register(Limiter, self()); - _ -> ok - end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> ExistingHolder end, @@ -982,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), State2 = - case is_ch_blocked(C) of + case is_ch_blocked(C1) of true -> State1#q{ blocked_consumers = add_consumer(ChPid, Consumer, @@ -1000,34 +1001,27 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> + ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of not_found -> - ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, - limiter = Limiter} -> - C1 = C#cr{consumer_count = ConsumerCount -1}, - maybe_store_ch_record( - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(Limiter, self()), - C1#cr{limiter = rabbit_limiter:make_token()}; - _ -> C1 - end), + C -> + update_consumer_count(C, -1), emit_consumer_deleted(ChPid, ConsumerTag), - ok = maybe_send_reply(ChPid, OkMsg), - NewState = - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers), - blocked_consumers = remove_consumer( + State1 = State#q{ + exclusive_consumer = case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end, + active_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.active_consumers), + blocked_consumers = remove_consumer( ChPid, ConsumerTag, State#q.blocked_consumers)}, - case should_auto_delete(NewState) of - false -> reply(ok, ensure_expiry_timer(NewState)); - true -> {stop, normal, ok, NewState} + case should_auto_delete(State1) of + false -> reply(ok, ensure_expiry_timer(State1)); + true -> {stop, normal, ok, State1} end end; @@ -1057,14 +1051,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end. + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end)). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1073,33 +1062,26 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, AckTags, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - maybe_store_ch_record(C#cr{acktags = subtract_acks( - ChAckTags, AckTags)}), - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - noreply(State#q{backing_queue_state = BQS1}) - end; - -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(case Requeue of - true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1} - end) - end; +handle_cast({ack, AckTags, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end)); + +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case Requeue of + true -> requeue_and_run(AckTags, State1); + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end + end)); handle_cast(delete_immediately, State) -> {stop, normal, State}; diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 205d5bba..43c26941 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -108,21 +108,34 @@ recover(XNames, QNames) -> SelectSet = fun (#resource{kind = exchange}) -> XNameSet; (#resource{kind = queue}) -> QNameSet end, - [recover_semi_durable_route(R, SelectSet(Dst)) || + {ok, Gatherer} = gatherer:start_link(), + [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) || R = #route{binding = #binding{destination = Dst}} <- rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), ok. -recover_semi_durable_route(R = #route{binding = B}, ToRecover) -> +recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) -> #binding{source = Src, destination = Dst} = B, - {ok, X} = rabbit_exchange:lookup(Src), + case sets:is_element(Dst, ToRecover) of + true -> {ok, X} = rabbit_exchange:lookup(Src), + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + recover_semi_durable_route_txn(R, X), + gatherer:finish(Gatherer) + end); + false -> ok + end. + +recover_semi_durable_route_txn(R = #route{binding = B}, X) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Rs = mnesia:match_object(rabbit_semi_durable_route, R, read), - case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of - false -> no_recover; - true -> ok = sync_transient_route(R, fun mnesia:write/3), - rabbit_exchange:serial(X) + case mnesia:match_object(rabbit_semi_durable_route, R, read) of + [] -> no_recover; + _ -> ok = sync_transient_route(R, fun mnesia:write/3), + rabbit_exchange:serial(X) end end, fun (no_recover, _) -> ok; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dfe84644..d2f55277 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,8 +35,8 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, - user, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, consumer_monitors, queue_collector_pid, + user, virtual_host, most_recently_declared_queue, queue_monitors, + consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -189,9 +189,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, + queue_monitors = dict:new(), consumer_mapping = dict:new(), - blocking = dict:new(), - consumer_monitors = dict:new(), + blocking = sets:new(), + queue_consumers = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -275,7 +276,7 @@ handle_cast(terminate, State) -> handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(monitor_consumer(ConsumerTag, State)); + noreply(consumer_monitor(ConsumerTag, State)); handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -299,13 +300,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); + noreply(State3#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -323,15 +324,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> noreply([ensure_stats_timer], State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); -handle_info({'DOWN', MRef, process, QPid, Reason}, - State = #ch{consumer_monitors = ConsumerMonitors}) -> - noreply( - case dict:find(MRef, ConsumerMonitors) of - error -> - handle_publishing_queue_down(QPid, Reason, State); - {ok, ConsumerTag} -> - handle_consuming_queue_down(MRef, ConsumerTag, State) - end); +handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> + State1 = handle_publishing_queue_down(QPid, Reason, State), + State2 = queue_blocked(QPid, State1), + State3 = handle_consuming_queue_down(QPid, State2), + erase_queue_stats(QPid), + noreply(State3#ch{queue_monitors = + dict:erase(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -516,17 +515,16 @@ check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case dict:find(QPid, Blocking) of - error -> State; - {ok, MRef} -> true = erlang:demonitor(MRef), - Blocking1 = dict:erase(QPid, Blocking), - ok = case dict:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, - State#ch{blocking = Blocking1} + case sets:is_element(QPid, Blocking) of + false -> State; + true -> Blocking1 = sets:del_element(QPid, Blocking), + ok = case sets:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + demonitor_queue(QPid, State#ch{blocking = Blocking1}) end. record_confirm(undefined, _, State) -> @@ -545,38 +543,41 @@ confirm(MsgSeqNos, QPid, State) -> {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> - {MXs, UMQ1, UQM1} = - lists:foldl( - fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], UMQ, UQM}, MsgSeqNos), - {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> - UQM1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> gb_trees:delete(QPid, UQM); - false -> gb_trees:update(QPid, MsgSeqNos1, UQM) - end; - none -> - UQM - end, +process_confirms(MsgSeqNos, QPid, Nack, State) -> + lists:foldl( + fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, + Acc, Nack); + none -> Acc + end + end, {[], State}, MsgSeqNos). + +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, + {MXs, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}}, + Nack) -> + State1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> UQM1 = gb_trees:delete(QPid, UQM), + demonitor_queue( + QPid, State#ch{unconfirmed_qm = UQM1}); + false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State#ch{unconfirmed_qm = UQM1} + end; + none -> + State + end, Qs1 = gb_sets:del_element(QPid, Qs), %% If QPid somehow died initiating a nack, clear the message from %% internal data-structures. Also, cleanup empty entries. case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> - {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; - false -> - {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} + true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), + {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; + false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), + {MXs, State1#ch{unconfirmed_mq = UMQ1}} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -693,11 +694,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -707,7 +708,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, State3#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -746,12 +747,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q} -> State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - {Q, undefined}, + dict:store(ActualConsumerTag, Q, ConsumerMapping)}, {noreply, case NoWait of - true -> monitor_consumer(ActualConsumerTag, State1); + true -> consumer_monitor(ActualConsumerTag, State1); false -> State1 end}; {{error, exclusive_consume_unavailable}, _Q} -> @@ -768,22 +768,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, _, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors}) -> + queue_consumers = QCons}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {Q, MRef}} -> - ConsumerMonitors1 = - case MRef of - undefined -> ConsumerMonitors; - _ -> true = erlang:demonitor(MRef), - dict:erase(MRef, ConsumerMonitors) + {ok, Q = #amqqueue{pid = QPid}} -> + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), + QCons1 = + case dict:find(QPid, QCons) of + error -> QCons; + {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), + case gb_sets:is_empty(CTags1) of + true -> dict:erase(QPid, QCons); + false -> dict:store(QPid, CTags1, QCons) + end end, - NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, - ConsumerMapping), - consumer_monitors = ConsumerMonitors1}, + NewState = demonitor_queue( + Q, State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1108,10 +1112,12 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || - QPid <- QPids], + QPids -> State2 = lists:foldl(fun monitor_queue/2, + State1#ch{blocking = + sets:from_list(QPids)}, + QPids), ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = dict:from_list(Queues)}} + {noreply, State2} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1120,23 +1126,51 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - capabilities = Capabilities}) -> +consumer_monitor(ConsumerTag, + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QCons, + capabilities = Capabilities}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> - {#amqqueue{pid = QPid} = Q, undefined} = - dict:fetch(ConsumerTag, ConsumerMapping), - MRef = erlang:monitor(process, QPid), - State#ch{consumer_mapping = - dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), - consumer_monitors = - dict:store(MRef, ConsumerTag, ConsumerMonitors)}; + #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), + QCons1 = dict:update(QPid, + fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), + QCons), + monitor_queue(QPid, State#ch{queue_consumers = QCons1}); _ -> State end. +monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case (not dict:is_key(QPid, QMons) andalso + queue_monitor_needed(QPid, State)) of + true -> MRef = erlang:monitor(process, QPid), + State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; + false -> State + end. + +demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> + case (dict:is_key(QPid, QMons) andalso + not queue_monitor_needed(QPid, State)) of + true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), + State#ch{queue_monitors = dict:erase(QPid, QMons)}; + false -> State + end. + +queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, + queue_consumers = QCons, + blocking = Blocking, + unconfirmed_qm = UQM}) -> + StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = dict:is_key(QPid, QCons), + QueueBlocked = sets:is_element(QPid, Blocking), + ConfirmMonitored = gb_trees:is_defined(QPid, UQM), + StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1157,21 +1191,25 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> {true, fun send_nacks/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - erase_queue_stats(QPid), - State3 = SendFun(MXs, State2), - queue_blocked(QPid, State3). - -handle_consuming_queue_down(MRef, ConsumerTag, - State = #ch{consumer_mapping = ConsumerMapping, - consumer_monitors = ConsumerMonitors, - writer_pid = WriterPid}) -> - ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), - ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), - Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), + SendFun(MXs, State2). + +handle_consuming_queue_down(QPid, + State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QCons, + writer_pid = WriterPid}) -> + ConsumerTags = case dict:find(QPid, QCons) of + error -> gb_sets:new(); + {ok, CTags} -> CTags + end, + ConsumerMapping1 = + gb_sets:fold(fun (CTag, CMap) -> + Cancel = #'basic.cancel'{consumer_tag = CTag, + nowait = true}, + ok = rabbit_writer:send_command(WriterPid, Cancel), + dict:erase(CTag, CMap) + end, ConsumerMapping, ConsumerTags), State#ch{consumer_mapping = ConsumerMapping1, - consumer_monitors = ConsumerMonitors1}. + queue_consumers = dict:erase(QPid, QCons)}. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, @@ -1271,9 +1309,8 @@ ack(Acked, State) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), [{QPid, length(MsgIds)} | L] end, [], Acked), - maybe_incr_stats(QIncs, ack, State), ok = notify_limiter(State#ch.limiter, Acked), - State. + maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), uncommitted_ack_q = queue:new()}. @@ -1307,8 +1344,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> consumer_queues(Consumers) -> lists:usort([QPid || - {_Key, {#amqqueue{pid = QPid}, _MRef}} - <- dict:to_list(Consumers)]). + {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -1334,38 +1370,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ XName, MsgSeqNo, Message, State), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1), - State1. + QPid <- DeliveredQPids]], publish, State1). process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State), - record_confirm(MsgSeqNo, XName, State); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State)); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); + record_confirm(MsgSeqNo, XName, + maybe_incr_stats([{XName, 1}], return_not_delivered, State)); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + #ch{unconfirmed_mq = UMQ} = State, UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), SingletonSet = gb_sets:singleton(MsgSeqNo), - UQM1 = lists:foldl( - fun (QPid, UQM2) -> - maybe_monitor(QPid), - case gb_trees:lookup(QPid, UQM2) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - gb_trees:update(QPid, MsgSeqNos1, UQM2); - none -> - gb_trees:insert(QPid, SingletonSet, UQM2) - end - end, UQM, QPids), - State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. + lists:foldl( + fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), + State0#ch{unconfirmed_qm = UQM1}; + none -> + UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), + monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + end + end, State#ch{unconfirmed_mq = UMQ1}, QPids). lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1385,11 +1420,13 @@ send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - C1 = lists:append(C), - MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), - MsgSeqNo - end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}); + {MsgSeqNos, State1} = + lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> + {[MsgSeqNo | MSNs], + maybe_incr_stats([{ExchangeName, 1}], confirm, + State0)} + end, {[], State}, lists:append(C)), + send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1469,30 +1506,26 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, _) -> - ok. +maybe_incr_redeliver_stats(_, _, State) -> + State. -maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> +maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of - fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; - _ -> ok + fine -> lists:foldl(fun ({QX, Inc}, State0) -> + incr_stats(QX, Inc, Measure, State0) + end, State, QXIncs); + _ -> State end. -incr_stats({QPid, _} = QX, Inc, Measure) -> - maybe_monitor(QPid), - update_measures(queue_exchange_stats, QX, Inc, Measure); -incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - maybe_monitor(QPid), - update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(X, Inc, Measure) -> - update_measures(exchange_stats, X, Inc, Measure). - -maybe_monitor(QPid) -> - case get({monitoring, QPid}) of - undefined -> erlang:monitor(process, QPid), - put({monitoring, QPid}, true); - _ -> ok - end. +incr_stats({QPid, _} = QX, Inc, Measure, State) -> + update_measures(queue_exchange_stats, QX, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(X, Inc, Measure, State) -> + update_measures(exchange_stats, X, Inc, Measure), + State. update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of @@ -1528,7 +1561,6 @@ emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> end. erase_queue_stats(QPid) -> - erase({monitoring, QPid}), erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3822aaeb..b4871cef 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -502,20 +502,7 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame, self(), Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - channel_cleanup(ChPid), - State; - {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking - andalso - Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State - end; - _ -> - State - end; + post_process_frame(AnalyzedFrame, ChPid, State); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -527,6 +514,23 @@ handle_frame(Type, Channel, Payload, end end. +post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> + channel_cleanup(ChPid), + State; +post_process_frame({method, MethodName, _}, _ChPid, + State = #v1{connection = #connection{ + protocol = Protocol}}) -> + case Protocol:method_has_content(MethodName) of + true -> erlang:bump_reductions(2000), + case State#v1.connection_state of + blocking -> State#v1{connection_state = blocked}; + _ -> State + end; + false -> State + end; +post_process_frame(_Frame, _ChPid, State) -> + State. + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, |