diff options
authorMatthias Radestock <>2010-05-26 10:38:42 +0100
committerMatthias Radestock <>2010-05-26 10:38:42 +0100
commitae170538e18ccdd785253f1891d69171dfca6344 (patch)
parente649f863f74aae26c5e63061289eb4b3b14255d6 (diff)
improve flow control logic
- hopefully easier to understand - handle unexpected channel.flow-ok with an explicit error
1 files changed, 66 insertions, 68 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index edae3305..3a1051e3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,7 +48,9 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, active}).
+ consumer_mapping, blocking, flow}).
+-record(flow, {server, client, pending}).
-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
@@ -163,7 +165,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- active = true},
+ flow = #flow{server = true, client = true,
+ pending = none}},
@@ -221,10 +224,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
handle_cast({conserve_memory, Conserve}, State) ->
flow_control(not Conserve, State);
-handle_cast({flow_timeout, Ref}, #ch{active = {_Atom, Active, {Ref, _TRef}}}) ->
+handle_cast({flow_timeout, Ref}, #ch{flow = #flow{client = ClientFlow,
+ pending = {Ref, _TRef}}}) ->
- "timeout waiting for channel.flow_ok{active=~w}", [Active]);
+ "timeout waiting for channel.flow_ok{active=~w}", [not ClientFlow]);
handle_cast({flow_timeout, _Ref}, State) ->
@@ -384,61 +388,49 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
+handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid,
- active = Active}) ->
- case (case Active of
- false -> false;
- {_Atom, true, _Refs} -> false;
- _ -> true
- end) of
- false ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "basic.publish received after channel.flow_ok{active=false}", []);
- true ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, State),
- Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
- %% We decode the content's properties here because we're
- %% almost certain to want to look at delivery-mode and
- %% priority.
- DecodedContent =
- rabbit_binary_parser:ensure_content_decoded(Content),
- IsPersistent = is_message_persistent(DecodedContent),
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
- case RoutingRes of
- routed ->
- ok;
- unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312,
- <<"unroutable">>);
- not_delivered ->
- %% FIXME: 313 should be replaced by the
- %% ?NO_CONSUMERS definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313,
- <<"not_delivered">>)
- end,
- {noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
- end}
- end;
+ writer_pid = WriterPid}) ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
+ Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ %% We decode the content's properties here because we're almost
+ %% certain to want to look at delivery-mode and priority.
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ IsPersistent = is_message_persistent(DecodedContent),
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ case RoutingRes of
+ routed ->
+ ok;
+ unroutable ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ not_delivered ->
+ %% FIXME: 313 should be replaced by the
+ %% ?NO_CONSUMERS definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ end,
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
+ end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -840,7 +832,6 @@ handle_method(#'channel.flow'{active = true}, _,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -859,13 +850,24 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{active = {pending, Active, {_Ref, TRef}}}) ->
+ State = #ch{flow = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
{ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{active = Active}};
+ {noreply, State#ch{flow = #flow{client = Active, pending = none}}};
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{active = {invert, Active, {_Ref, TRef}}}) ->
+ State = #ch{flow = #flow{server = Flow, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
{ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(not Active, State)};
+ {noreply, issue_flow(Flow, State)};
+handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unsolicited channel.flow_ok", []);
+handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
@@ -873,17 +875,12 @@ handle_method(_MethodRecord, _Content, _State) ->
-flow_control(Active, State = #ch{active = {pending, NotActive, Refs}})
- when NotActive =:= not Active ->
- noreply(State#ch{active = {invert, NotActive, Refs}});
-flow_control(Active, State = #ch{active = {invert, Active, Refs}}) ->
- noreply(State#ch{active = {pending, Active, Refs}});
-flow_control(Active, State = #ch{active = NotActive})
- when NotActive =:= not Active ->
+flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
+ when Flow =:= not Active ->
ok = clear_permission_cache(),
noreply(issue_flow(Active, State));
-flow_control(_Active, State) ->
- noreply(State).
+flow_control(Active, State = #ch{flow = F}) ->
+ noreply(State#ch{flow = F#flow{server = Active}}).
issue_flow(Active, State) ->
ok = rabbit_writer:send_command(
@@ -891,7 +888,8 @@ issue_flow(Active, State) ->
Ref = make_ref(),
{ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
[self(), Ref]),
- State#ch{active = {pending, Active, {Ref, TRef}}}.
+ State#ch{flow = #flow{server = Active, client = not Active,
+ pending = {Ref, TRef}}}.
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->