diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 12:54:38 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 12:54:38 +0100 |
commit | b6014122cd0c1a1556d07fe5fe80704680c1621a (patch) | |
tree | 5ce6a3cbb21b3ef7419a0786ba0ac789122f8b9c | |
parent | 1e74500391dc74f3b8ddef467cbfb6c0e6820e35 (diff) | |
download | rabbitmq-server-b6014122cd0c1a1556d07fe5fe80704680c1621a.tar.gz |
5) if we get a flow-ok but then still get a publish, throw away the publish and close the channel with an appropriate error
-rw-r--r-- | src/rabbit_channel.erl | 31 |
1 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a48db9c8..5101cce2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -46,7 +46,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking}). + consumer_mapping, blocking, blocked}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new()}, + blocking = dict:new(), + blocked = false}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -212,7 +213,7 @@ handle_cast({conserve_memory, Conserve}, State) -> ok = clear_permission_cache(), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). + noreply(State#ch{blocked = {pending, Conserve}}). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -370,13 +371,16 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{exchange = ExchangeNameBin, +handle_method(#'basic.publish'{}, _Content, #ch{blocked = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "Publish received after flow_ok", []); +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -828,11 +832,10 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = _}, _, State) -> - %% TODO: We may want to correlate this to channel.flow messages we - %% have sent, and complain if we get an unsolicited - %% channel.flow_ok, or the client refuses our flow request. - {noreply, State}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{blocked = {pending, Conserve}}) + when Active =:= not Conserve -> + {noreply, State#ch{blocked = Conserve}}; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( |