summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-24 14:12:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-24 14:12:09 +0100
commit7b79de595d6b6fb5db1478072b81c22e5723b999 (patch)
tree8624246108bc60bfb364002ec05313839921dd84
parent89b3be6ef4138452869b82324c582a1aff72c6c8 (diff)
downloadrabbitmq-server-7b79de595d6b6fb5db1478072b81c22e5723b999.tar.gz
If we have local active consumers, ignore federated ones.
-rw-r--r--src/rabbit_amqqueue_process.erl27
1 files changed, 20 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 71dbb9de..4c3751ba 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -426,7 +426,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case queue:out(ActiveConsumers) of
+ case pick_consumer(ActiveConsumers) of
{empty, _} ->
{false, State};
{{value, QEntry}, Tail} ->
@@ -544,12 +544,25 @@ notify_federation(#q{q = Q,
active_unfederated(Cs) ->
case queue:out(Cs) of
- {empty, _} -> false;
- {{value, {_Pid, #consumer{args = Args}}}, Cs1} ->
- case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of
- {longstr, <<"federation">>} -> active_unfederated(Cs1);
- _ -> true
- end
+ {empty, _} -> false;
+ {{value, C}, Cs1} -> case federated_consumer(C) of
+ true -> active_unfederated(Cs1);
+ false -> true
+ end
+ end.
+
+%% TODO this could be more efficient. But we'd need another representation,
+%% and thus to abstract the representation of active_consumers.
+pick_consumer(Cs) ->
+ case lists:splitwith(fun federated_consumer/1, queue:to_list(Cs)) of
+ {_, []} -> queue:out(Cs);
+ {Feds, [UnFed|Tl]} -> queue:out(queue:from_list([UnFed | Feds ++ Tl]))
+ end.
+
+federated_consumer({_Pid, #consumer{args = Args}}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of
+ {longstr, <<"federation">>} -> true;
+ _ -> false
end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},