diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 14:05:49 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 14:05:49 +0100 |
commit | 0c46f034c74a55636c497b4353d5cc0e6d48428f (patch) | |
tree | 91d8537eb40e6e5849344c533c67926e9bdf2474 | |
parent | b6014122cd0c1a1556d07fe5fe80704680c1621a (diff) | |
download | rabbitmq-server-0c46f034c74a55636c497b4353d5cc0e6d48428f.tar.gz |
active is much easier to think about than blocked. Deal with the possibility of receiving various conserve_memory messages whilst waiting for a flow_ok to come back from a client
-rw-r--r-- | src/rabbit_channel.erl | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5101cce2..560131be 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -46,7 +46,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, blocked}). + consumer_mapping, blocking, active}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -154,7 +154,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - blocked = false}, + active = true}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -209,11 +209,19 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, + State = #ch{active = {pending, Conserve}}) -> + noreply(State#ch{active = {invert, Conserve}}); +handle_cast({conserve_memory, Conserve}, + State = #ch{active = {invert, Conserve}}) -> + noreply(State#ch{active = {pending, Conserve}}); +handle_cast({conserve_memory, Conserve}, State = #ch{active = Conserve}) -> ok = clear_permission_cache(), ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State#ch{blocked = {pending, Conserve}}). + State#ch.writer_pid, #'channel.flow'{active = not Conserve}), + noreply(State#ch{active = {pending, not Conserve}}); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -371,9 +379,12 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{}, _Content, #ch{blocked = true}) -> +handle_method(#'basic.publish'{}, _Content, #ch{active = Active}) + when Active =:= false orelse Active =:= {pending, true} orelse + Active =:= {invert, true} -> rabbit_misc:protocol_error( - precondition_failed, "Publish received after flow_ok", []); + precondition_failed, + "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -833,9 +844,13 @@ handle_method(#'channel.flow'{active = false}, _, end; handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{blocked = {pending, Conserve}}) - when Active =:= not Conserve -> - {noreply, State#ch{blocked = Conserve}}; + State = #ch{active = {pending, Active}}) -> + noreply(State#ch{active = Active}); +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{active = {invert, Active}}) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = not Active}), + noreply(State#ch{active = {pending, not Active}}); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( |