summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-24 12:54:38 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-24 12:54:38 +0100
commitb6014122cd0c1a1556d07fe5fe80704680c1621a (patch)
tree5ce6a3cbb21b3ef7419a0786ba0ac789122f8b9c
parent1e74500391dc74f3b8ddef467cbfb6c0e6820e35 (diff)
downloadrabbitmq-server-b6014122cd0c1a1556d07fe5fe80704680c1621a.tar.gz
5) if we get a flow-ok but then still get a publish, throw away the publish and close the channel with an appropriate error
-rw-r--r--src/rabbit_channel.erl31
1 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a48db9c8..5101cce2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, blocked}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ blocked = false},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -212,7 +213,7 @@ handle_cast({conserve_memory, Conserve}, State) ->
ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State).
+ noreply(State#ch{blocked = {pending, Conserve}}).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -370,13 +371,16 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{}, _Content, #ch{blocked = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "Publish received after flow_ok", []);
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ mandatory = Mandatory,
+ immediate = Immediate},
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -828,11 +832,10 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = _}, _, State) ->
- %% TODO: We may want to correlate this to channel.flow messages we
- %% have sent, and complain if we get an unsolicited
- %% channel.flow_ok, or the client refuses our flow request.
- {noreply, State};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{blocked = {pending, Conserve}})
+ when Active =:= not Conserve ->
+ {noreply, State#ch{blocked = Conserve}};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(