diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-07-06 18:21:28 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-07-06 18:21:28 +0100 |
commit | 1a36bd727ec792b0a9b75da731f27b2fb8a89a02 (patch) | |
tree | ceb32df981ba9684d3e5fd532f5ee0c81a0386ea | |
parent | a47b7dba6201972a293dd3563466ad29f834e799 (diff) | |
download | rabbitmq-server-1a36bd727ec792b0a9b75da731f27b2fb8a89a02.tar.gz |
Store sync'ed slaves in Mnesia. This is not quite right yet.
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 18 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 16 |
6 files changed, 48 insertions, 46 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e8b4a623..d6fac46d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,8 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes, policy}). + arguments, pid, slave_pids, sync_slave_pids, mirror_nodes, + policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index afbaea65..32a33812 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -215,6 +215,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], + sync_slave_pids = [], mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8933de87..9d03805a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -93,6 +93,7 @@ consumers, memory, slave_pids, + synchronised_slave_pids, backing_queue_status ]). @@ -102,9 +103,7 @@ durable, auto_delete, arguments, - owner_pid, - slave_pids, - synchronised_slave_pids + owner_pid ]). -define(INFO_KEYS, @@ -893,37 +892,7 @@ make_dead_letter_msg(Reason, now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> - {Prefix, Items1} = - case lists:member(synchronised_slave_pids, Items) of - true -> Prefix1 = slaves_status(State), - case lists:member(slave_pids, Items) of - true -> {Prefix1, Items -- [slave_pids]}; - false -> {proplists:delete(slave_pids, Prefix1), Items} - end; - false -> {[], Items} - end, - Prefix ++ [{Item, i(Item, State)} - || Item <- (Items1 -- [synchronised_slave_pids])]. - -slaves_status(#q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> - [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - {ok, #amqqueue{slave_pids = SPids}} -> - {Results, _Bad} = - delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), - {SPids1, SSPids} = - lists:foldl( - fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> - {[Pid | SPidsN], - case proplists:get_bool(is_synchronised, Infos) of - true -> [Pid | SSPidsN]; - false -> SSPidsN - end} - end, {[], []}, Results), - [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] - end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -957,9 +926,14 @@ i(memory, _) -> M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; {ok, #amqqueue{slave_pids = SPids}} -> SPids end; +i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> ''; + {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..4d26a4d0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -58,8 +58,9 @@ remove_from_queue(QueueName, DeadPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + sync_slave_pids = SSPids}] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], @@ -70,8 +71,11 @@ remove_from_queue(QueueName, DeadPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }, + SSPids1 = [SSPid || SSPid <- SSPids, + lists:member(SSPid, SPids1)], + Q1 = Q #amqqueue { pid = QPid1, + slave_pids = SPids1, + sync_slave_pids = SSPids1}, ok = rabbit_amqqueue:store_queue(Q1), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 03fafc3e..ee1d2b07 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -442,8 +442,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> - rabbit_event:notify(queue_slave_promoted, [{pid, self()}, - {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, @@ -906,10 +904,20 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, +set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName }, synchronised = false }) -> - rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, - {name, QName}]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + ok = rabbit_amqqueue:store_queue(Q2) + end + end), + rabbit_amqqueue:info(Q, [name]), %% Wake it up TODO this doesn't work State #state { synchronised = true }; set_synchronised(true, State) -> State; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 18704807..8a44e03a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -40,6 +40,7 @@ -rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). +-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -62,7 +63,7 @@ -spec(topic_trie_node/0 :: () -> 'ok'). -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). - +-spec(sync_slave_pids/0 :: () -> 'ok'). -endif. %%-------------------------------------------------------------------- @@ -240,6 +241,19 @@ queue_policy(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, mirror_nodes, policy]). +sync_slave_pids() -> + [ok = sync_slave_pids(T) || T <- [rabbit_queue, rabbit_durable_queue]]. + +sync_slave_pids(Table) -> + transform( + Table, + fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) -> + {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, + slave_pids, sync_slave_pids, mirror_nodes, policy]). + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |