summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-09 11:32:36 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-09 11:32:36 +0000
commite68984c2f411371d8d0512557639c30b48ffa8da (patch)
treee27e301ab937bc1c4576b13433c337a69a1e267f
parent6212861309962a136e7b2ed0480d77c00dcb395b (diff)
parent84016d39416c8b5b08783cdbf4fbbdaacdb82aeb (diff)
downloadrabbitmq-server-e68984c2f411371d8d0512557639c30b48ffa8da.tar.gz
merge default into bug25938
-rw-r--r--src/rabbit_amqqueue_process.erl146
-rw-r--r--src/rabbit_binary_generator.erl22
-rw-r--r--src/rabbit_channel.erl102
-rw-r--r--src/rabbit_limiter.erl12
-rw-r--r--src/rabbit_nodes.erl11
-rw-r--r--src/rabbit_queue_consumers.erl62
-rw-r--r--src/rabbit_queue_decorator.erl13
7 files changed, 182 insertions, 186 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 281aecb9..d916dccb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
@@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -213,18 +213,17 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+maybe_notify_decorators(false, State) -> State;
+maybe_notify_decorators(true, State) -> notify_decorators(State), State.
-notify_decorators(Event, Props, State = #q{consumers = Consumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []).
+
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+ decorator_callback(qname(State), consumer_state_changed,
+ [P, BQ:is_empty(BQS)]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -308,7 +307,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _} <-
rabbit_queue_consumers:all(Consumers)],
@@ -401,21 +400,12 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
+ true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
State.
-deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
- {Active, Blocked, State1, Consumers1} =
- rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
- Consumers),
- State2 = State1#q{consumers = Consumers1},
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
- {_ChPid, CTag} <- Blocked],
- {Active, State2}.
-
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -461,49 +451,68 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- {_Active, State3} = deliver_msgs_to_consumers(
- fun(AckRequired, State1) ->
- {Result, State2} = fetch(AckRequired, State1),
- {Result, is_empty(State2), State2}
- end, is_empty(State), State),
- State3.
+run_message_queue(State) -> run_message_queue(false, State).
+
+run_message_queue(ActiveConsumersChanged, State) ->
+ case is_empty(State) of
+ true -> maybe_notify_decorators(ActiveConsumersChanged, State);
+ false -> case rabbit_queue_consumers:deliver(
+ fun(AckRequired) -> fetch(AckRequired, State) end,
+ qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged1, State1, Consumers} ->
+ run_message_queue(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State1#q{consumers = Consumers});
+ {undelivered, ActiveConsumersChanged1, Consumers} ->
+ maybe_notify_decorators(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State#q{consumers = Consumers})
+ end
+ end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case IsDuplicate of
- false -> deliver_msgs_to_consumers(
- fun (true, State2 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State2#q{backing_queue_state = BQS3}};
- (false, State2) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State2)}
- end, false, State1);
- true -> {true, State1}
+ case rabbit_queue_consumers:deliver(
+ fun (true) -> true = BQ:is_empty(BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS),
+ {{Message, Delivered, AckTag},
+ State#q{backing_queue_state = BQS1}};
+ (false) -> {{Message, Delivered, undefined},
+ discard(Delivery, State)}
+ end, qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State1#q{consumers = Consumers})};
+ {undelivered, ActiveConsumersChanged, Consumers} ->
+ {undelivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State#q{consumers = Consumers})}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
- Delivered, State) ->
+ Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
- case attempt_delivery(Delivery, Props, Delivered, State1) of
- {true, State2} ->
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State2 = State1#q{backing_queue_state = BQS1},
+ case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
+ State2) of
+ true ->
State2;
+ {delivered, State3} ->
+ State3;
%% The next one is an optimisation
- {false, State2 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- {Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
- QLen = BQ:len(BQS2),
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
+ discard(Delivery, State3);
+ {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ {Dropped, State4 = #q{backing_queue_state = BQS4}} =
+ maybe_drop_head(State3#q{backing_queue_state = BQS3}),
+ QLen = BQ:len(BQS4),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -512,9 +521,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State3;
- {true, true, undefined} -> State3;
- {_, _, _} -> drop_expired_msgs(State3)
+ {false, false, _} -> State4;
+ {true, true, undefined} -> State4;
+ {_, _, _} -> drop_expired_msgs(State4)
end
end.
@@ -566,13 +575,9 @@ requeue(AckTags, ChPid, State) ->
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
- unchanged ->
- State;
- {unblocked, UnblockedCTags, Consumers1} ->
- State1 = State#q{consumers = Consumers1},
- [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State1) || CTag <- UnblockedCTags],
- run_message_queue(State1)
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -599,8 +604,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- ChCTags],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
@@ -1034,8 +1038,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1054,8 +1057,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1218,7 +1220,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
ChPid, State));
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 8eaac10d..83f68ed3 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -120,18 +120,18 @@ table_field_to_binary({FName, T, V}) ->
[short_string_to_binary(FName) | field_value_to_binary(T, V)].
field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)];
-field_value_to_binary(signedint, V) -> [$I | <<V:32/signed>>];
+field_value_to_binary(signedint, V) -> [$I, <<V:32/signed>>];
field_value_to_binary(decimal, V) -> {Before, After} = V,
- [$D | [Before, <<After:32>>]];
-field_value_to_binary(timestamp, V) -> [$T | <<V:64>>];
+ [$D, Before, <<After:32>>];
+field_value_to_binary(timestamp, V) -> [$T, <<V:64>>];
field_value_to_binary(table, V) -> [$F | table_to_binary(V)];
field_value_to_binary(array, V) -> [$A | array_to_binary(V)];
-field_value_to_binary(byte, V) -> [$b | <<V:8/unsigned>>];
-field_value_to_binary(double, V) -> [$d | <<V:64/float>>];
-field_value_to_binary(float, V) -> [$f | <<V:32/float>>];
-field_value_to_binary(long, V) -> [$l | <<V:64/signed>>];
-field_value_to_binary(short, V) -> [$s | <<V:16/signed>>];
-field_value_to_binary(bool, V) -> [$t | [if V -> 1; true -> 0 end]];
+field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> [$d, <<V:64/float>>];
+field_value_to_binary(float, V) -> [$f, <<V:32/float>>];
+field_value_to_binary(long, V) -> [$l, <<V:64/signed>>];
+field_value_to_binary(short, V) -> [$s, <<V:16/signed>>];
+field_value_to_binary(bool, V) -> [$t, if V -> 1; true -> 0 end];
field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)];
field_value_to_binary(void, _V) -> [$V].
@@ -154,13 +154,13 @@ generate_array_iolist(Array) ->
short_string_to_binary(String) ->
Len = string_length(String),
- if Len < 256 -> [<<Len:8>> | String];
+ if Len < 256 -> [<<Len:8>>, String];
true -> exit(content_properties_shortstr_overflow)
end.
long_string_to_binary(String) ->
Len = string_length(String),
- [<<Len:32>> | String].
+ [<<Len:32>>, String].
string_length(String) when is_binary(String) -> size(String);
string_length(String) -> length(String).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4d778f94..eded8a90 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -694,12 +694,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
- requeue = Requeue},
- _, State) ->
+ requeue = Requeue}, _, State) ->
reject(DeliveryTag, Requeue, Multiple, State);
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
- multiple = Multiple},
+ multiple = Multiple},
_, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
@@ -710,8 +709,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1#ch{tx = {Msgs, Acks1}}
end};
-handle_method(#'basic.get'{queue = QueueNameBin,
- no_ack = NoAck},
+handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
limiter = Limiter,
@@ -749,9 +747,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
+ _, State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -801,8 +799,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
end;
-handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = NoWait},
+handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
@@ -849,13 +846,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = 0}, _,
- State = #ch{limiter = Limiter}) ->
+handle_method(#'basic.qos'{prefetch_count = 0},
+ _, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
- State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
%% TODO queue:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
@@ -864,8 +861,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
+ _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
foreach_per_queue(
@@ -887,19 +883,18 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
Content, State),
{reply, #'basic.recover_ok'{}, State1};
-handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
- requeue = Requeue},
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
_, State) ->
reject(DeliveryTag, Requeue, false, State);
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
- passive = false,
- durable = Durable,
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ type = TypeNameBin,
+ passive = false,
+ durable = Durable,
auto_delete = AutoDelete,
- internal = Internal,
- nowait = NoWait,
- arguments = Args},
+ internal = Internal,
+ nowait = NoWait,
+ arguments = Args},
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -932,17 +927,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- passive = true,
- nowait = NoWait},
+ passive = true,
+ nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused,
- nowait = NoWait},
+ nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
@@ -958,19 +953,19 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
end;
handle_method(#'exchange.bind'{destination = DestinationNameBin,
- source = SourceNameBin,
+ source = SourceNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.bind_ok'{}, NoWait, State);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
- source = SourceNameBin,
+ source = SourceNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.unbind_ok'{}, NoWait, State);
@@ -1062,10 +1057,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
-handle_method(#'queue.delete'{queue = QueueNameBin,
+handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
- if_empty = IfEmpty,
- nowait = NoWait},
+ if_empty = IfEmpty,
+ nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
@@ -1087,25 +1082,24 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
#'queue.delete_ok'{message_count = PurgedMessageCount})
end;
-handle_method(#'queue.bind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
+handle_method(#'queue.bind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:add/2,
ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.bind_ok'{}, NoWait, State);
-handle_method(#'queue.unbind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
routing_key = RoutingKey,
- arguments = Arguments}, _, State) ->
+ arguments = Arguments}, _, State) ->
binding_action(fun rabbit_binding:remove/2,
ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
#'queue.unbind_ok'{}, false, State);
-handle_method(#'queue.purge'{queue = QueueNameBin,
- nowait = NoWait},
+handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
@@ -1153,15 +1147,15 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter = Limiter}) ->
+handle_method(#'channel.flow'{active = true},
+ _, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unblock(Limiter),
{reply, #'channel.flow_ok'{active = true},
maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-handle_method(#'channel.flow'{active = false}, _,
- State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
+handle_method(#'channel.flow'{active = false},
+ _, State = #ch{consumer_mapping = Consumers,
+ limiter = Limiter}) ->
case rabbit_limiter:is_blocked(Limiter) of
true -> {noreply, maybe_send_flow_ok(State)};
false -> Limiter1 = rabbit_limiter:block(Limiter),
@@ -1186,8 +1180,8 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
- drain = Drain}, _,
- State = #ch{consumer_mapping = Consumers}) ->
+ drain = Drain},
+ _, State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:credit(
Q, self(), CTag, Credit, Drain),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 10e00fa3..c33b3c74 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -245,9 +245,9 @@ can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
case is_consumer_blocked(L, CTag) of
false -> case (State =/= active orelse
safe_call(Pid, {can_send, self(), AckRequired}, true)) of
- true -> {continue, L#qstate{
- credits = record_send_q(CTag, Credits)}};
- false -> {suspend, L#qstate{state = suspended}}
+ true -> Credits1 = decrement_credit(CTag, Credits),
+ {continue, L#qstate{credits = Credits1}};
+ false -> {suspend, L#qstate{state = suspended}}
end;
true -> {suspend, L}
end.
@@ -271,9 +271,9 @@ is_suspended(#qstate{}) -> false.
is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
case gb_trees:lookup(CTag, Credits) of
+ none -> false;
{value, #credit{credit = C}} when C > 0 -> false;
- {value, #credit{}} -> true;
- none -> false
+ {value, #credit{}} -> true
end.
credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
@@ -305,7 +305,7 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
-record_send_q(CTag, Credits) ->
+decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
{value, #credit{credit = Credit, drain = Drain}} ->
update_credit(CTag, Credit - 1, Drain, Credits);
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b54fdd2e..5a1613a7 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,9 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2]).
+ is_running/2, is_process_running/2, fqdn_nodename/0]).
+
+-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
@@ -35,6 +37,7 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
+-spec(fqdn_nodename/0 :: () -> binary()).
-endif.
@@ -107,3 +110,9 @@ is_process_running(Node, Process) ->
undefined -> false;
P when is_pid(P) -> true
end.
+
+fqdn_nodename() ->
+ {ID, _} = rabbit_nodes:parts(node()),
+ {ok, Host} = inet:gethostname(),
+ {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
+ list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 702091dc..f06423f7 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -76,14 +76,14 @@
'not_found' | {[ack()], [rabbit_types:ctag()],
state()}.
-spec send_drained() -> 'ok'.
--spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
- boolean(), rabbit_amqqueue:name(), T, state()) ->
- {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}.
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state()) ->
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -181,44 +181,41 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, [], S, State).
+deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-deliver(_FetchFun, true, _QName, Blocked, S, State) ->
- {true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S,
- State = #state{consumers = Consumers, use = Use}) ->
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ {undelivered, ConsumersChanged,
+ State#state{use = update_use(State#state.use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, Blocked1, S1, Consumers1} =
- deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1,
- State#state{consumers = Consumers1})
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, ConsumersChanged, R,
+ State#state{consumers = priority_queue:in(QEntry, Priority,
+ Tail)}};
+ undelivered ->
+ deliver(FetchFun, QName, true,
+ State#state{consumers = Tail})
+ end
end.
-deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ undelivered;
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ undelivered;
{continue, Limiter} ->
- {Stop, S1} = deliver_to_consumer(
- FetchFun, Consumer,
- C#cr{limiter = Limiter}, QName, S),
- {Stop, Blocked, S1,
- priority_queue:in(E, Priority, Consumers)}
+ {delivered, deliver_to_consumer(
+ FetchFun, Consumer,
+ C#cr{limiter = Limiter}, QName)}
end
end.
@@ -228,8 +225,8 @@ deliver_to_consumer(FetchFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- QName, S) ->
- {{Message, IsDelivered, AckTag}, Stop, S1} = FetchFun(AckRequired, S),
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -238,7 +235,7 @@ deliver_to_consumer(FetchFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {Stop, S1}.
+ R.
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
@@ -290,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a5..6205e2dc 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,9 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+%% called with Queue, MaxActivePriority, IsEmpty
+-callback consumer_state_changed(
+ rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
-else.
@@ -32,7 +27,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {consumer_state_changed, 3}];
behaviour_info(_Other) ->
undefined.