summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-24 17:50:27 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-24 17:50:27 +0100
commit5f00855ba0ca440abe4efef80461b8aea1be3601 (patch)
treed0a54fec2f4dd2d5b41296670f4e312f30243de1
parent3b006dcd66be7a3c97e4c24ddfbd2d9903636070 (diff)
parent146b12abaed07a00fa48df620440df9ae30d9a6f (diff)
downloadrabbitmq-server-5f00855ba0ca440abe4efef80461b8aea1be3601.tar.gz
Merge in default
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_channel.erl17
3 files changed, 63 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 767abeb0..61a1cdd5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -149,9 +149,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -296,6 +296,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
policy_changed(Q1, Q2) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ rabbit_federation_queue:policy_changed(Q1, Q2),
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
wake_up(Q1).
@@ -554,9 +555,10 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
+ OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c790a12d..bafc12dd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,7 +55,7 @@
status
}).
--record(consumer, {tag, ack_required}).
+-record(consumer, {tag, ack_required, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -198,6 +198,7 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
+ rabbit_federation_queue:maybe_start(Q),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -264,7 +265,7 @@ init_dlx_routing_key(RoutingKey, State) ->
init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
-terminate_shutdown(Fun, State) ->
+terminate_shutdown(Fun, State = #q{q = Q}) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
[fun stop_sync_timer/1,
@@ -275,6 +276,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
+ rabbit_federation_queue:terminate(Q),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -430,7 +432,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case queue:out(ActiveConsumers) of
+ case pick_consumer(ActiveConsumers) of
{empty, _} ->
{false, State};
{{value, QEntry}, Tail} ->
@@ -444,12 +446,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
+ notify_federation(State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
+ notify_federation(State),
{false, State};
{continue, Limiter} ->
AC1 = queue:in(E, State#q.active_consumers),
@@ -531,8 +535,42 @@ run_message_queue(State) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
+ notify_federation(State1),
State1.
+notify_federation(#q{q = Q,
+ active_consumers = ActiveConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ IsEmpty = BQ:is_empty(BQS),
+ case IsEmpty andalso active_unfederated(ActiveConsumers) of
+ true -> rabbit_federation_queue:run(Q);
+ false -> rabbit_federation_queue:stop(Q)
+ end.
+
+active_unfederated(Cs) ->
+ case queue:out(Cs) of
+ {empty, _} -> false;
+ {{value, C}, Cs1} -> case federated_consumer(C) of
+ true -> active_unfederated(Cs1);
+ false -> true
+ end
+ end.
+
+%% TODO this could be more efficient. But we'd need another representation,
+%% and thus to abstract the representation of active_consumers.
+pick_consumer(Cs) ->
+ case lists:splitwith(fun federated_consumer/1, queue:to_list(Cs)) of
+ {_, []} -> queue:out(Cs);
+ {Feds, [UnFed|Tl]} -> queue:out(queue:from_list([UnFed | Feds ++ Tl]))
+ end.
+
+federated_consumer({_Pid, #consumer{args = Args}}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of
+ {longstr, <<"federation">>} -> true;
+ _ -> false
+ end.
+
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1114,11 +1152,12 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
- State = #q{q = #amqqueue{name = QName}}) ->
+ State = #q{q = Q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
{empty, State2} ->
+ rabbit_federation_queue:basic_get(Q),
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -1135,7 +1174,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1159,7 +1198,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
false -> ok
end,
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
+ ack_required = not NoAck,
+ args = OtherArgs},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
@@ -1199,6 +1239,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_federation(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22692dcb..da4d66c3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Arguments},
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Arguments),
+ CreditArgs, OtherArgs,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1224,12 +1225,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
- _ -> none
- end;
- undefined -> none
+ {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
+ undefined -> {none, Arguments}
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,