diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 14:57:51 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 14:57:51 +0100 |
commit | f225f3fbad2e7abd2b1c8432d6d849a13a2271e0 (patch) | |
tree | 835e2fe5deaff4fb6a858895dbb7fc3c418e8c36 | |
parent | 0c46f034c74a55636c497b4353d5cc0e6d48428f (diff) | |
download | rabbitmq-server-f225f3fbad2e7abd2b1c8432d6d849a13a2271e0.tar.gz |
3) wait for some reasonable time to receive a flow-ok; 4) if we don't get a flow-ok then close the channel with an appropriate error
-rw-r--r-- | src/rabbit_channel.erl | 146 |
1 files changed, 92 insertions, 54 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 560131be..3f1bf940 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,6 +39,8 @@ -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([flow_timeout/2]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -49,6 +51,7 @@ consumer_mapping, blocking, active}). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -66,6 +69,8 @@ -ifdef(use_specs). +-type(ref() :: any()). + -spec(start_link/5 :: (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). @@ -75,6 +80,7 @@ -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +119,9 @@ conserve_memory(Pid, Conserve) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +flow_timeout(Pid, Ref) -> + gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). + list() -> pg_local:get_members(rabbit_channels). @@ -209,18 +218,14 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, - State = #ch{active = {pending, Conserve}}) -> - noreply(State#ch{active = {invert, Conserve}}); -handle_cast({conserve_memory, Conserve}, - State = #ch{active = {invert, Conserve}}) -> - noreply(State#ch{active = {pending, Conserve}}); -handle_cast({conserve_memory, Conserve}, State = #ch{active = Conserve}) -> - ok = clear_permission_cache(), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not Conserve}), - noreply(State#ch{active = {pending, not Conserve}}); -handle_cast({conserve_memory, _Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> + flow_control(not Conserve, State); + +handle_cast({flow_timeout, Ref}, #ch{active = {_Atom, Active, {Ref, _TRef}}}) -> + rabbit_misc:protocol_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", [Active]); +handle_cast({flow_timeout, _Ref}, State) -> noreply(State). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, @@ -379,51 +384,61 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{}, _Content, #ch{active = Active}) - when Active =:= false orelse Active =:= {pending, true} orelse - Active =:= {invert, true} -> - rabbit_misc:protocol_error( - precondition_failed, - "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid}) -> - ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, State), - Exchange = rabbit_exchange:lookup_or_die(ExchangeName), - %% We decode the content's properties here because we're almost - %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - IsPersistent = is_message_persistent(DecodedContent), - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> - ok; - unroutable -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); - not_delivered -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) - end, - {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) - end}; + writer_pid = WriterPid, + active = Active}) -> + case (case Active of + false -> false; + {_Atom, true, _Refs} -> false; + _ -> true + end) of + false -> + rabbit_misc:protocol_error( + precondition_failed, + "basic.publish received after channel.flow_ok{active=false}", []); + true -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + %% We decode the content's properties here because we're + %% almost certain to want to look at delivery-mode and + %% priority. + DecodedContent = + rabbit_binary_parser:ensure_content_decoded(Content), + IsPersistent = is_message_persistent(DecodedContent), + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + case RoutingRes of + routed -> + ok; + unroutable -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, + <<"unroutable">>); + not_delivered -> + %% FIXME: 313 should be replaced by the + %% ?NO_CONSUMERS definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, + <<"not_delivered">>) + end, + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) + end} + end; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -844,13 +859,18 @@ handle_method(#'channel.flow'{active = false}, _, end; handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{active = {pending, Active}}) -> + State = #ch{active = {pending, Active, {_Ref, TRef}}}) -> + {ok, cancel} = timer:cancel(TRef), noreply(State#ch{active = Active}); handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{active = {invert, Active}}) -> + State = #ch{active = {invert, Active, {_Ref, TRef}}}) -> + {ok, cancel} = timer:cancel(TRef), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not Active}), - noreply(State#ch{active = {pending, not Active}}); + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + noreply(State#ch{active = {pending, not Active, {Ref, TRef}}}); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -858,6 +878,24 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +flow_control(Active, State = #ch{active = {pending, NotActive, Refs}}) + when NotActive =:= not Active -> + noreply(State#ch{active = {invert, NotActive, Refs}}); +flow_control(Active, State = #ch{active = {invert, NotActive, Refs}}) + when NotActive =:= not Active -> + noreply(State#ch{active = {pending, NotActive, Refs}}); +flow_control(Active, State = #ch{active = NotActive}) + when NotActive =:= not Active -> + ok = clear_permission_cache(), + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = Active}), + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + noreply(State#ch{active = {pending, Active, {Ref, TRef}}}); +flow_control(_Active, State) -> + noreply(State). + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> %% FIXME: connection exception (!) on failure?? |