summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-07-06 18:21:28 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-07-06 18:21:28 +0100
commit1a36bd727ec792b0a9b75da731f27b2fb8a89a02 (patch)
treeceb32df981ba9684d3e5fd532f5ee0c81a0386ea
parenta47b7dba6201972a293dd3563466ad29f834e799 (diff)
downloadrabbitmq-server-1a36bd727ec792b0a9b75da731f27b2fb8a89a02.tar.gz
Store sync'ed slaves in Mnesia. This is not quite right yet.
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_amqqueue_process.erl44
-rw-r--r--src/rabbit_mirror_queue_misc.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl18
-rw-r--r--src/rabbit_upgrade_functions.erl16
6 files changed, 48 insertions, 46 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index e8b4a623..d6fac46d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, mirror_nodes,
+ policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index afbaea65..32a33812 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -215,6 +215,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
+ sync_slave_pids = [],
mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8933de87..9d03805a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -93,6 +93,7 @@
consumers,
memory,
slave_pids,
+ synchronised_slave_pids,
backing_queue_status
]).
@@ -102,9 +103,7 @@
durable,
auto_delete,
arguments,
- owner_pid,
- slave_pids,
- synchronised_slave_pids
+ owner_pid
]).
-define(INFO_KEYS,
@@ -893,37 +892,7 @@ make_dead_letter_msg(Reason,
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) ->
- {Prefix, Items1} =
- case lists:member(synchronised_slave_pids, Items) of
- true -> Prefix1 = slaves_status(State),
- case lists:member(slave_pids, Items) of
- true -> {Prefix1, Items -- [slave_pids]};
- false -> {proplists:delete(slave_pids, Prefix1), Items}
- end;
- false -> {[], Items}
- end,
- Prefix ++ [{Item, i(Item, State)}
- || Item <- (Items1 -- [synchronised_slave_pids])].
-
-slaves_status(#q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} ->
- [{slave_pids, ''}, {synchronised_slave_pids, ''}];
- {ok, #amqqueue{slave_pids = SPids}} ->
- {Results, _Bad} =
- delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
- {SPids1, SSPids} =
- lists:foldl(
- fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
- {[Pid | SPidsN],
- case proplists:get_bool(is_synchronised, Infos) of
- true -> [Pid | SSPidsN];
- false -> SSPidsN
- end}
- end, {[], []}, Results),
- [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
- end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -957,9 +926,14 @@ i(memory, _) ->
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
{ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
+i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> '';
+ {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids
+ end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..4d26a4d0 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -58,8 +58,9 @@ remove_from_queue(QueueName, DeadPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
@@ -70,8 +71,11 @@ remove_from_queue(QueueName, DeadPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 },
+ SSPids1 = [SSPid || SSPid <- SSPids,
+ lists:member(SSPid, SPids1)],
+ Q1 = Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1,
+ sync_slave_pids = SSPids1},
ok = rabbit_amqqueue:store_queue(Q1),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 03fafc3e..ee1d2b07 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -442,8 +442,6 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_event:notify(queue_slave_promoted, [{pid, self()},
- {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
@@ -906,10 +904,20 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+set_synchronised(true, State = #state { q = Q = #amqqueue { name = QName },
synchronised = false }) ->
- rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
- {name, QName}]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ ok = rabbit_amqqueue:store_queue(Q2)
+ end
+ end),
+ rabbit_amqqueue:info(Q, [name]), %% Wake it up TODO this doesn't work
State #state { synchronised = true };
set_synchronised(true, State) ->
State;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 18704807..8a44e03a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -40,6 +40,7 @@
-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
+-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
%% -------------------------------------------------------------------
@@ -62,7 +63,7 @@
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
-
+-spec(sync_slave_pids/0 :: () -> 'ok').
-endif.
%%--------------------------------------------------------------------
@@ -240,6 +241,19 @@ queue_policy(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid,
slave_pids, mirror_nodes, policy]).
+sync_slave_pids() ->
+ [ok = sync_slave_pids(T) || T <- [rabbit_queue, rabbit_durable_queue]].
+
+sync_slave_pids(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, N, D, AD, Excl, Args, Pid, SPids, MNodes, Pol}) ->
+ {amqqueue, N, D, AD, Excl, Args, Pid, SPids, [], MNodes, Pol}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid,
+ slave_pids, sync_slave_pids, mirror_nodes, policy]).
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->