summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-01 14:13:27 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-01 14:13:27 +0100
commitb428435664d171a998e8b51059a3739ec70e83ee (patch)
tree1fe2c0df6ba1676e1b86d1adedba75a8b4f29459
parent9da1f8a0965a560b3f1adabf1048ba2c2a4d4a7e (diff)
downloadrabbitmq-server-b428435664d171a998e8b51059a3739ec70e83ee.tar.gz
Ensure that if we fail over / reconfigure federation while running, we ask the qproc to remoind us what our running state is.
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl4
2 files changed, 9 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 61a1cdd5..7004a353 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_federation/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -155,6 +155,7 @@
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
+-spec(notify_federation/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
@@ -563,6 +564,9 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+notify_federation(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_federation).
+
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
put(Key, case get(Key) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a0c06598..edef9af0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1406,6 +1406,10 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
+handle_cast(notify_federation, State) ->
+ notify_federation(State),
+ noreply(State);
+
handle_cast(wake_up, State) ->
noreply(State).