summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-24 14:57:51 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-24 14:57:51 +0100
commitf225f3fbad2e7abd2b1c8432d6d849a13a2271e0 (patch)
tree835e2fe5deaff4fb6a858895dbb7fc3c418e8c36
parent0c46f034c74a55636c497b4353d5cc0e6d48428f (diff)
downloadrabbitmq-server-f225f3fbad2e7abd2b1c8432d6d849a13a2271e0.tar.gz
3) wait for some reasonable time to receive a flow-ok; 4) if we don't get a flow-ok then close the channel with an appropriate error
-rw-r--r--src/rabbit_channel.erl146
1 files changed, 92 insertions, 54 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 560131be..3f1bf940 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,6 +39,8 @@
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([flow_timeout/2]).
+
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
@@ -49,6 +51,7 @@
consumer_mapping, blocking, active}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -66,6 +69,8 @@
-ifdef(use_specs).
+-type(ref() :: any()).
+
-spec(start_link/5 ::
(channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
@@ -75,6 +80,7 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +119,9 @@ conserve_memory(Pid, Conserve) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+flow_timeout(Pid, Ref) ->
+ gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -209,18 +218,14 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve},
- State = #ch{active = {pending, Conserve}}) ->
- noreply(State#ch{active = {invert, Conserve}});
-handle_cast({conserve_memory, Conserve},
- State = #ch{active = {invert, Conserve}}) ->
- noreply(State#ch{active = {pending, Conserve}});
-handle_cast({conserve_memory, Conserve}, State = #ch{active = Conserve}) ->
- ok = clear_permission_cache(),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not Conserve}),
- noreply(State#ch{active = {pending, not Conserve}});
-handle_cast({conserve_memory, _Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
+ flow_control(not Conserve, State);
+
+handle_cast({flow_timeout, Ref}, #ch{active = {_Atom, Active, {Ref, _TRef}}}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}", [Active]);
+handle_cast({flow_timeout, _Ref}, State) ->
noreply(State).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -379,51 +384,61 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _Content, #ch{active = Active})
- when Active =:= false orelse Active =:= {pending, true} orelse
- Active =:= {invert, true} ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, State),
- Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
- %% We decode the content's properties here because we're almost
- %% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- IsPersistent = is_message_persistent(DecodedContent),
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
- case RoutingRes of
- routed ->
- ok;
- unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
- not_delivered ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
- end,
- {noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
- end};
+ writer_pid = WriterPid,
+ active = Active}) ->
+ case (case Active of
+ false -> false;
+ {_Atom, true, _Refs} -> false;
+ _ -> true
+ end) of
+ false ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "basic.publish received after channel.flow_ok{active=false}", []);
+ true ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
+ Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ %% We decode the content's properties here because we're
+ %% almost certain to want to look at delivery-mode and
+ %% priority.
+ DecodedContent =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ IsPersistent = is_message_persistent(DecodedContent),
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ case RoutingRes of
+ routed ->
+ ok;
+ unroutable ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312,
+ <<"unroutable">>);
+ not_delivered ->
+ %% FIXME: 313 should be replaced by the
+ %% ?NO_CONSUMERS definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313,
+ <<"not_delivered">>)
+ end,
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
+ end}
+ end;
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -844,13 +859,18 @@ handle_method(#'channel.flow'{active = false}, _,
end;
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{active = {pending, Active}}) ->
+ State = #ch{active = {pending, Active, {_Ref, TRef}}}) ->
+ {ok, cancel} = timer:cancel(TRef),
noreply(State#ch{active = Active});
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{active = {invert, Active}}) ->
+ State = #ch{active = {invert, Active, {_Ref, TRef}}}) ->
+ {ok, cancel} = timer:cancel(TRef),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not Active}),
- noreply(State#ch{active = {pending, not Active}});
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ noreply(State#ch{active = {pending, not Active, {Ref, TRef}}});
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -858,6 +878,24 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+flow_control(Active, State = #ch{active = {pending, NotActive, Refs}})
+ when NotActive =:= not Active ->
+ noreply(State#ch{active = {invert, NotActive, Refs}});
+flow_control(Active, State = #ch{active = {invert, NotActive, Refs}})
+ when NotActive =:= not Active ->
+ noreply(State#ch{active = {pending, NotActive, Refs}});
+flow_control(Active, State = #ch{active = NotActive})
+ when NotActive =:= not Active ->
+ ok = clear_permission_cache(),
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = Active}),
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ noreply(State#ch{active = {pending, Active, {Ref, TRef}}});
+flow_control(_Active, State) ->
+ noreply(State).
+
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
%% FIXME: connection exception (!) on failure??