summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-15 17:50:25 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-15 17:50:25 +0100
commit973200ef9f6bf82b73b523dba8d7b17911466f6a (patch)
treea7acf945fdbdd9a36ef65755eb1ee4b7a436b258
parent400abeb742b134c4a8ae6de0b4f9fe3b728ec043 (diff)
downloadrabbitmq-server-973200ef9f6bf82b73b523dba8d7b17911466f6a.tar.gz
Quick and dirty hooks for federated queues.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_channel.erl4
2 files changed, 11 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d2f4a178..6eb81869 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -190,6 +190,10 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
+ case Q#amqqueue.name#resource.name of
+ <<"test">> -> rabbit_federation_queue:start_link(Q);
+ _ -> ok
+ end,
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -518,11 +522,14 @@ discard(#delivery{sender = SenderPid,
end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-
-run_message_queue(State) ->
+run_message_queue(State = #q{q = Q}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
+ case queue:len(State1#q.active_consumers) of
+ 0 -> rabbit_federation_queue:stop(Q);
+ _ -> rabbit_federation_queue:go(Q)
+ end,
State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 37041d34..69836204 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1219,8 +1219,8 @@ parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
- _ -> none
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
end;
undefined -> none
end.