diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-26 10:38:42 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-26 10:38:42 +0100 |
commit | ae170538e18ccdd785253f1891d69171dfca6344 (patch) | |
tree | 87c7693794cba75cdd8afcfc1ed391d9d2007f7e | |
parent | e649f863f74aae26c5e63061289eb4b3b14255d6 (diff) | |
download | rabbitmq-server-ae170538e18ccdd785253f1891d69171dfca6344.tar.gz |
improve flow control logic
- hopefully easier to understand
- handle unexpected channel.flow-ok with an explicit error
-rw-r--r-- | src/rabbit_channel.erl | 134 |
1 files changed, 66 insertions, 68 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index edae3305..3a1051e3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,7 +48,9 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, active}). + consumer_mapping, blocking, flow}). + +-record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds @@ -163,7 +165,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - active = true}, + flow = #flow{server = true, client = true, + pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -221,10 +224,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast({conserve_memory, Conserve}, State) -> flow_control(not Conserve, State); -handle_cast({flow_timeout, Ref}, #ch{active = {_Atom, Active, {Ref, _TRef}}}) -> +handle_cast({flow_timeout, Ref}, #ch{flow = #flow{client = ClientFlow, + pending = {Ref, _TRef}}}) -> rabbit_misc:protocol_error( precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", [Active]); + "timeout waiting for channel.flow_ok{active=~w}", [not ClientFlow]); handle_cast({flow_timeout, _Ref}, State) -> noreply(State). @@ -384,61 +388,49 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; +handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> + rabbit_misc:protocol_error( + precondition_failed, + "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid, - active = Active}) -> - case (case Active of - false -> false; - {_Atom, true, _Refs} -> false; - _ -> true - end) of - false -> - rabbit_misc:protocol_error( - precondition_failed, - "basic.publish received after channel.flow_ok{active=false}", []); - true -> - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, State), - Exchange = rabbit_exchange:lookup_or_die(ExchangeName), - %% We decode the content's properties here because we're - %% almost certain to want to look at delivery-mode and - %% priority. - DecodedContent = - rabbit_binary_parser:ensure_content_decoded(Content), - IsPersistent = is_message_persistent(DecodedContent), - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> - ok; - unroutable -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, - <<"unroutable">>); - not_delivered -> - %% FIXME: 313 should be replaced by the - %% ?NO_CONSUMERS definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, - <<"not_delivered">>) - end, - {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) - end} - end; + writer_pid = WriterPid}) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + %% We decode the content's properties here because we're almost + %% certain to want to look at delivery-mode and priority. + DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + IsPersistent = is_message_persistent(DecodedContent), + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + case RoutingRes of + routed -> + ok; + unroutable -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); + not_delivered -> + %% FIXME: 313 should be replaced by the + %% ?NO_CONSUMERS definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + end, + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) + end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -840,7 +832,6 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; - handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -859,13 +850,24 @@ handle_method(#'channel.flow'{active = false}, _, end; handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{active = {pending, Active, {_Ref, TRef}}}) -> + State = #ch{flow = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> {ok, cancel} = timer:cancel(TRef), - {noreply, State#ch{active = Active}}; + {noreply, State#ch{flow = #flow{client = Active, pending = none}}}; handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{active = {invert, Active, {_Ref, TRef}}}) -> + State = #ch{flow = #flow{server = Flow, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> {ok, cancel} = timer:cancel(TRef), - {noreply, issue_flow(not Active, State)}; + {noreply, issue_flow(Flow, State)}; +handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> + rabbit_misc:protocol_error( + command_invalid, "unsolicited channel.flow_ok", []); +handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, + "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -873,17 +875,12 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -flow_control(Active, State = #ch{active = {pending, NotActive, Refs}}) - when NotActive =:= not Active -> - noreply(State#ch{active = {invert, NotActive, Refs}}); -flow_control(Active, State = #ch{active = {invert, Active, Refs}}) -> - noreply(State#ch{active = {pending, Active, Refs}}); -flow_control(Active, State = #ch{active = NotActive}) - when NotActive =:= not Active -> +flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) + when Flow =:= not Active -> ok = clear_permission_cache(), noreply(issue_flow(Active, State)); -flow_control(_Active, State) -> - noreply(State). +flow_control(Active, State = #ch{flow = F}) -> + noreply(State#ch{flow = F#flow{server = Active}}). issue_flow(Active, State) -> ok = rabbit_writer:send_command( @@ -891,7 +888,8 @@ issue_flow(Active, State) -> Ref = make_ref(), {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, [self(), Ref]), - State#ch{active = {pending, Active, {Ref, TRef}}}. + State#ch{flow = #flow{server = Active, client = not Active, + pending = {Ref, TRef}}}. binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> |