summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-24 14:05:49 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-24 14:05:49 +0100
commit0c46f034c74a55636c497b4353d5cc0e6d48428f (patch)
tree91d8537eb40e6e5849344c533c67926e9bdf2474
parentb6014122cd0c1a1556d07fe5fe80704680c1621a (diff)
downloadrabbitmq-server-0c46f034c74a55636c497b4353d5cc0e6d48428f.tar.gz
active is much easier to think about than blocked. Deal with the possibility of receiving various conserve_memory messages whilst waiting for a flow_ok to come back from a client
-rw-r--r--src/rabbit_channel.erl35
1 files changed, 25 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5101cce2..560131be 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, blocked}).
+ consumer_mapping, blocking, active}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -154,7 +154,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- blocked = false},
+ active = true},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -209,11 +209,19 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve},
+ State = #ch{active = {pending, Conserve}}) ->
+ noreply(State#ch{active = {invert, Conserve}});
+handle_cast({conserve_memory, Conserve},
+ State = #ch{active = {invert, Conserve}}) ->
+ noreply(State#ch{active = {pending, Conserve}});
+handle_cast({conserve_memory, Conserve}, State = #ch{active = Conserve}) ->
ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State#ch{blocked = {pending, Conserve}}).
+ State#ch.writer_pid, #'channel.flow'{active = not Conserve}),
+ noreply(State#ch{active = {pending, not Conserve}});
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -371,9 +379,12 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _Content, #ch{blocked = true}) ->
+handle_method(#'basic.publish'{}, _Content, #ch{active = Active})
+ when Active =:= false orelse Active =:= {pending, true} orelse
+ Active =:= {invert, true} ->
rabbit_misc:protocol_error(
- precondition_failed, "Publish received after flow_ok", []);
+ precondition_failed,
+ "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -833,9 +844,13 @@ handle_method(#'channel.flow'{active = false}, _,
end;
handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{blocked = {pending, Conserve}})
- when Active =:= not Conserve ->
- {noreply, State#ch{blocked = Conserve}};
+ State = #ch{active = {pending, Active}}) ->
+ noreply(State#ch{active = Active});
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{active = {invert, Active}}) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = not Active}),
+ noreply(State#ch{active = {pending, not Active}});
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(