summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-28 17:41:52 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-28 17:41:52 +0100
commita8ff68e6b4a856778897d8a1c357a1bf6cfcb1f3 (patch)
tree908de780e19a373d309f4ede18f3e5b82ca5ceb7
parentb8f6018e0dc8ee6f5e67ee1d58fafa938185c42d (diff)
parentdbe5902eebece9ef99c8688634fe32558a23a10c (diff)
downloadrabbitmq-server-bug23968.tar.gz
Merge default into bug23968bug23968
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control5
-rw-r--r--src/gm.erl134
-rw-r--r--src/gm_speed_test.erl82
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_binding.erl23
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_error_logger.erl6
-rw-r--r--src/rabbit_misc.erl1
-rw-r--r--src/rabbit_mnesia.erl127
-rw-r--r--src/rabbit_msg_store.erl47
-rw-r--r--src/rabbit_prelaunch.erl2
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_upgrade.erl331
-rw-r--r--src/rabbit_upgrade_functions.erl12
-rw-r--r--src/rabbit_variable_queue.erl13
-rw-r--r--src/rabbit_version.erl172
18 files changed, 727 insertions, 245 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index ae9b2059..45af770a 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1
+- New Upstream Release
+
* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 12165dc0..2ca5074f 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.4.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Alexandru Scvortov <alexandru@rabbitmq.com> Tue, 22 Mar 2011 17:34:31 +0000
+
rabbitmq-server (2.3.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 02da0cc6..45f5c5c4 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,10 +7,7 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-# erlang-inets is not a strict dependency, but it's needed to allow
-# the installation of plugins that use mochiweb. Ideally it would be a
-# "Recommends" instead, but gdebi does not install those.
-Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/gm.erl b/src/gm.erl
index 8cf22581..5b3623cf 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,15 +376,16 @@
confirmed_broadcast/2, group_members/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, prioritise_info/2]).
+ code_change/3, prioritise_cast/2, prioritise_info/2]).
-export([behaviour_info/1]).
--export([table_definitions/0]).
+-export([table_definitions/0, flush/1]).
-define(GROUP_TABLE, gm_group).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(BROADCAST_TIMER, 25).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -398,7 +399,9 @@
pub_count,
members_state,
callback_args,
- confirms
+ confirms,
+ broadcast_buffer,
+ broadcast_timer
}).
-record(gm_group, { name, version, members }).
@@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) ->
group_members(Server) ->
gen_server2:call(Server, group_members, infinity).
+flush(Server) ->
+ gen_server2:cast(Server, flush).
+
init([GroupName, Module, Args]) ->
random:seed(now()),
gen_server2:cast(self(), join),
Self = self(),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = 0,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new() }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = 0,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_timer = undefined }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self,
{Module:joined(Args, all_known_members(View)), State1});
handle_cast(leave, State) ->
- {stop, normal, State}.
+ {stop, normal, State};
+
+handle_cast(flush, State) ->
+ noreply(
+ flush_broadcast_buffer(State #state { broadcast_timer = undefined })).
handle_info({'DOWN', MRef, process, _Pid, _Reason},
@@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
end.
-terminate(Reason, #state { module = Module,
- callback_args = Args }) ->
+terminate(Reason, State = #state { module = Module,
+ callback_args = Args }) ->
+ flush_broadcast_buffer(State),
Module:terminate(Args, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+prioritise_cast(flush, _State) -> 1;
+prioritise_cast(_ , _State) -> 0.
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
prioritise_info(_ , _State) -> 0.
@@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, State, hibernate}.
+ {noreply, ensure_broadcast_timer(State), hibernate}.
reply(Reply, State) ->
- {reply, Reply, State, hibernate}.
-
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- members_state = MembersState,
- module = Module,
- confirms = Confirms,
- callback_args = Args }) ->
- PubMsg = {PubCount, Msg},
- Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
- ok = maybe_send_activity(activity_finalise(Activity), State),
- MembersState1 =
- with_member(
- fun (Member = #member { pending_ack = PA }) ->
- Member #member { pending_ack = queue:in(PubMsg, PA) }
- end, Self, MembersState),
+ {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = undefined }) ->
+ State;
+ensure_broadcast_timer(State = #state { broadcast_buffer = [],
+ broadcast_timer = TRef }) ->
+ timer:cancel(TRef),
+ State #state { broadcast_timer = undefined };
+ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
+ {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]),
+ State #state { broadcast_timer = TRef };
+ensure_broadcast_timer(State) ->
+ State.
+
+internal_broadcast(Msg, From, State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer }) ->
+ Result = Module:handle_msg(Args, Self, Msg),
+ Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
_ -> queue:in({PubCount, From}, Confirms)
end,
- handle_callback_result({Module:handle_msg(Args, Self, Msg),
- State #state { pub_count = PubCount + 1,
- members_state = MembersState1,
- confirms = Confirms1 }}).
+ State1 = State #state { pub_count = PubCount + 1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1 },
+ case From =/= none of
+ true ->
+ handle_callback_result({Result, flush_broadcast_buffer(State1)});
+ false ->
+ handle_callback_result(
+ {Result, State1 #state { broadcast_buffer = Buffer1 }})
+ end.
+
+flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
+ State;
+flush_broadcast_buffer(State = #state { self = Self,
+ members_state = MembersState,
+ broadcast_buffer = Buffer }) ->
+ Pubs = lists:reverse(Buffer),
+ Activity = activity_cons(Self, Pubs, [], activity_nil()),
+ ok = maybe_send_activity(activity_finalise(Activity), State),
+ MembersState1 = with_member(
+ fun (Member = #member { pending_ack = PA }) ->
+ PA1 = queue:join(PA, queue:from_list(Pubs)),
+ Member #member { pending_ack = PA1 }
+ end, Self, MembersState),
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [] }.
%% ---------------------------------------------------------------------------
@@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) ->
maybe_monitor(Other, _Self) ->
erlang:monitor(process, Other).
-check_neighbours(State = #state { self = Self,
- left = Left,
- right = Right,
- view = View }) ->
+check_neighbours(State = #state { self = Self,
+ left = Left,
+ right = Right,
+ view = View,
+ broadcast_buffer = Buffer }) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
Right1 = ensure_neighbour(Ver, Self, Right, VRight),
- State1 = State #state { left = Left1, right = Right1 },
+ Buffer1 = case Right1 of
+ {Self, undefined} -> [];
+ _ -> Buffer
+ end,
+ State1 = State #state { left = Left1, right = Right1,
+ broadcast_buffer = Buffer1 },
ok = maybe_send_catchup(Right, State1),
State1.
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
new file mode 100644
index 00000000..defb0f29
--- /dev/null
+++ b/src/gm_speed_test.erl
@@ -0,0 +1,82 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_speed_test).
+
+-export([test/3]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+-export([wile_e_coyote/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+%% callbacks
+
+joined(Owner, _Members) ->
+ Owner ! joined,
+ ok.
+
+members_changed(_Owner, _Births, _Deaths) ->
+ ok.
+
+handle_msg(Owner, _From, ping) ->
+ Owner ! ping,
+ ok.
+
+terminate(Owner, _Reason) ->
+ Owner ! terminated,
+ ok.
+
+%% other
+
+wile_e_coyote(Time, WriteUnit) ->
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ receive joined -> ok end,
+ timer:sleep(1000), %% wait for all to join
+ timer:send_after(Time, stop),
+ Start = now(),
+ {Sent, Received} = loop(Pid, WriteUnit, 0, 0),
+ End = now(),
+ ok = gm:leave(Pid),
+ receive terminated -> ok end,
+ Elapsed = timer:now_diff(End, Start) / 1000000,
+ io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n",
+ [Sent/Elapsed, Received/Elapsed]),
+ ok.
+
+loop(Pid, WriteUnit, Sent, Received) ->
+ case read(Received) of
+ {stop, Received1} -> {Sent, Received1};
+ {ok, Received1} -> ok = write(Pid, WriteUnit),
+ loop(Pid, WriteUnit, Sent + WriteUnit, Received1)
+ end.
+
+read(Count) ->
+ receive
+ ping -> read(Count + 1);
+ stop -> {stop, Count}
+ after 5 ->
+ {ok, Count}
+ end.
+
+write(_Pid, 0) -> ok;
+write(Pid, N) -> ok = gm:broadcast(Pid, ping),
+ write(Pid, N - 1).
+
+test(Time, WriteUnit, Nodes) ->
+ ok = gm:create_tables(),
+ [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes].
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c9a929ae..807e9e7d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -192,7 +192,8 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers().
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
@@ -233,6 +234,7 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
+ ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
@@ -245,6 +247,7 @@ start(normal, []) ->
end.
stop(_State) ->
+ ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7ddb7814..6167790e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,17 +331,18 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read({rabbit_exchange, XName}) of
- [] ->
- add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
- [X] ->
- add_deletion(XName, {X, not_deleted, Bindings},
- case rabbit_exchange:maybe_auto_delete(X) of
- not_deleted -> Deletions;
- {deleted, Deletions1} -> combine_deletions(
- Deletions, Deletions1)
- end)
- end.
+ {Entry, Deletions1} =
+ case mnesia:read({rabbit_exchange, XName}) of
+ [] -> {{undefined, not_deleted, Bindings}, Deletions};
+ [X] -> case rabbit_exchange:maybe_auto_delete(X) of
+ not_deleted ->
+ {{X, not_deleted, Bindings}, Deletions};
+ {deleted, Deletions2} ->
+ {{X, deleted, Bindings},
+ combine_deletions(Deletions, Deletions2)}
+ end
+ end,
+ add_deletion(XName, Entry, Deletions1).
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c12614c..5099bf3f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -156,7 +156,6 @@ ready_for_close(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun]) ->
- process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 0120f0d6..3fb0817a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -67,8 +67,12 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
+ %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
+ %% second resolution, not millisecond.
+ Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey, false, false, none,
- #'P_basic'{content_type = <<"text/plain">>},
+ #'P_basic'{content_type = <<"text/plain">>,
+ timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e79a58a1..2e9563cf 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -871,4 +871,3 @@ is_process_alive(Pid) ->
true -> true;
_ -> false
end.
-
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 66436920..fbcf07ae 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -18,9 +18,12 @@
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0,
+ cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]).
+ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
+ create_cluster_nodes_config/1, read_cluster_nodes_config/0,
+ record_running_nodes/0, read_previously_running_nodes/0,
+ delete_previously_running_nodes/0, running_nodes_filename/0]).
-export([table_names/0]).
@@ -42,6 +45,7 @@
-spec(dir/0 :: () -> file:filename()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
+-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([node()]) -> 'ok').
-spec(force_cluster/1 :: ([node()]) -> 'ok').
@@ -55,6 +59,12 @@
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
+-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
+-spec(read_cluster_nodes_config/0 :: () -> [node()]).
+-spec(record_running_nodes/0 :: () -> 'ok').
+-spec(read_previously_running_nodes/0 :: () -> [node()]).
+-spec(delete_previously_running_nodes/0 :: () -> 'ok').
+-spec(running_nodes_filename/0 :: () -> file:filename()).
-endif.
@@ -78,9 +88,10 @@ status() ->
{running_nodes, running_clustered_nodes()}].
init() ->
- ok = ensure_mnesia_running(),
- ok = ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config(), true),
+ ensure_mnesia_running(),
+ ensure_mnesia_dir(),
+ ok = init_db(read_cluster_nodes_config(), true,
+ fun maybe_upgrade_local_or_record_desired/0),
ok.
is_db_empty() ->
@@ -98,11 +109,12 @@ force_cluster(ClusterNodes) ->
%% node. If Force is false, only connections to online nodes are
%% allowed.
cluster(ClusterNodes, Force) ->
- ok = ensure_mnesia_not_running(),
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
- ok = init_db(ClusterNodes, Force),
+ ok = init_db(ClusterNodes, Force,
+ fun maybe_upgrade_local_or_record_desired/0),
ok = create_cluster_nodes_config(ClusterNodes)
after
mnesia:stop()
@@ -367,11 +379,40 @@ delete_cluster_nodes_config() ->
FileName, Reason}})
end.
+running_nodes_filename() ->
+ filename:join(dir(), "nodes_running_at_shutdown").
+
+record_running_nodes() ->
+ FileName = running_nodes_filename(),
+ Nodes = running_clustered_nodes() -- [node()],
+ %% Don't check the result: we're shutting down anyway and this is
+ %% a best-effort-basis.
+ rabbit_misc:write_term_file(FileName, [Nodes]),
+ ok.
+
+read_previously_running_nodes() ->
+ FileName = running_nodes_filename(),
+ case rabbit_misc:read_term_file(FileName) of
+ {ok, [Nodes]} -> Nodes;
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
+ FileName, Reason}})
+ end.
+
+delete_previously_running_nodes() ->
+ FileName = running_nodes_filename(),
+ case file:delete(FileName) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
+ FileName, Reason}})
+ end.
+
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes. If Force is false, don't allow
%% connections to offline nodes.
-init_db(ClusterNodes, Force) ->
+init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
UClusterNodes = lists:usort(ClusterNodes),
ProperClusterNodes = UClusterNodes -- [node()],
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
@@ -387,26 +428,21 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
- {[], true, [_]} ->
- %% True single disc node, attempt upgrade
- case rabbit_upgrade:maybe_upgrade() of
- ok -> ensure_schema_integrity();
- version_not_available -> schema_ok_or_move()
- end;
- {[], true, _} ->
- %% "Master" (i.e. without config) disc node in cluster,
- %% verify schema
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_integrity();
- {[], false, _} ->
+ case {Nodes, mnesia:system_info(use_dir)} of
+ {[], false} ->
%% Nothing there at all, start from scratch
ok = create_schema();
- {[AnotherNode|_], _, _} ->
+ {[], true} ->
+ %% We're the first node up
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ensure_schema_integrity();
+ version_not_available -> ok = schema_ok_or_move()
+ end,
+ ok;
+ {[AnotherNode|_], _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(rabbit_upgrade:read_version()),
ensure_version_ok(
- rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -415,7 +451,9 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ensure_schema_integrity()
+ ok = SecondaryPostMnesiaFun(),
+ ensure_schema_integrity(),
+ ok
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -424,6 +462,14 @@ init_db(ClusterNodes, Force) ->
throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
+maybe_upgrade_local_or_record_desired() ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ %% If we're just starting up a new node we won't have a
+ %% version
+ version_not_available -> ok = rabbit_version:record_desired()
+ end.
+
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -440,13 +486,13 @@ schema_ok_or_move() ->
end.
ensure_version_ok({ok, DiscVersion}) ->
- case rabbit_upgrade:desired_version() of
- DiscVersion -> ok;
- DesiredVersion -> throw({error, {schema_mismatch,
- DesiredVersion, DiscVersion}})
+ DesiredVersion = rabbit_version:desired(),
+ case rabbit_version:matches(DesiredVersion, DiscVersion) of
+ true -> ok;
+ false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}})
end;
ensure_version_ok({error, _}) ->
- ok = rabbit_upgrade:write_version().
+ ok = rabbit_version:record_desired().
create_schema() ->
mnesia:stop(),
@@ -455,8 +501,8 @@ create_schema() ->
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
ok = create_tables(),
- ok = ensure_schema_integrity(),
- ok = rabbit_upgrade:write_version().
+ ensure_schema_integrity(),
+ ok = rabbit_version:record_desired().
move_db() ->
mnesia:stop(),
@@ -476,18 +522,13 @@ move_db() ->
{error, Reason} -> throw({error, {cannot_backup_mnesia,
MnesiaDir, BackupDir, Reason}})
end,
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ok.
copy_db(Destination) ->
- mnesia:stop(),
- case rabbit_misc:recursive_copy(dir(), Destination) of
- ok ->
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia);
- {error, E} ->
- {error, E}
- end.
+ ok = ensure_mnesia_not_running(),
+ rabbit_misc:recursive_copy(dir(), Destination).
create_tables() ->
lists:foreach(fun ({Tab, TabDef}) ->
@@ -561,12 +602,12 @@ wait_for_tables(TableNames) ->
end.
reset(Force) ->
- ok = ensure_mnesia_not_running(),
+ ensure_mnesia_not_running(),
Node = node(),
case Force of
true -> ok;
false ->
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
{Nodes, RunningNodes} =
try
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bc68d2cd..e3aae572 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -820,15 +820,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State1 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
- file_handle_cache:close(CurHdl),
+ ok = file_handle_cache:close(CurHdl),
State2
end,
State3 = close_all_handles(State1),
- store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ ok = store_file_summary(FileSummaryEts, Dir),
+ [true = ets:delete(T) ||
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
- {index_module, IndexModule}], Dir),
+ ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ {index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -881,13 +882,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
- case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
- end,
+ ok = case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
+ end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs],
- State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }.
+ State2 = lists:foldl(
+ fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs),
+ State2 #msstate { on_sync = [] }.
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
@@ -1384,7 +1388,7 @@ recover_file_summary(false, _Dir) ->
recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> ok = file:delete(Path),
{true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1451,9 +1455,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
fun scan_fun/2, []),
- %% if something really bad has happened,
- %% the close could fail, but ignore
- file_handle_cache:close(Hdl),
+ ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
@@ -1889,32 +1891,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
recover_crashed_compactions(BaseDir),
ok.
foreach_file(D, Fun, Files) ->
- [Fun(filename:join(D, File)) || File <- Files].
+ [ok = Fun(filename:join(D, File)) || File <- Files].
foreach_file(D1, D2, Fun, Files) ->
- [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+ [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
transform_dir(BaseDir, Store, TransformFun) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
- foreach_file(TmpDir, Dir, fun file:copy/2, FileList),
+ foreach_file(TmpDir, Dir, CopyFile, FileList),
foreach_file(TmpDir, fun file:delete/1, FileList),
ok = file:del_dir(TmpDir)
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
- rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
@@ -1927,6 +1930,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
- file_handle_cache:close(RefOld),
- file_handle_cache:close(RefNew),
+ ok = file_handle_cache:close(RefOld),
+ ok = file_handle_cache:close(RefNew),
ok.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 7bb8c0ea..8800e8d6 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -235,7 +235,7 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
[{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 33c5391b..367953b8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -169,7 +169,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index ebda5d03..a2abb1e5 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,7 +16,7 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
-include("rabbit.hrl").
@@ -27,141 +27,260 @@
-ifdef(use_specs).
--type(step() :: atom()).
--type(version() :: [step()]).
-
--spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
--spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> version()).
+-spec(maybe_upgrade_mnesia/0 :: () -> 'ok').
+-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available').
-endif.
%% -------------------------------------------------------------------
-%% Try to upgrade the schema. If no information on the existing schema
-%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
-%% will catch the problem.
-maybe_upgrade() ->
- case read_version() of
- {ok, CurrentHeads} ->
- with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end);
- {error, enoent} ->
- version_not_available
+%% The upgrade logic is quite involved, due to the existence of
+%% clusters.
+%%
+%% Firstly, we have two different types of upgrades to do: Mnesia and
+%% everythinq else. Mnesia upgrades must only be done by one node in
+%% the cluster (we treat a non-clustered node as a single-node
+%% cluster). This is the primary upgrader. The other upgrades need to
+%% be done by all nodes.
+%%
+%% The primary upgrader has to start first (and do its Mnesia
+%% upgrades). Secondary upgraders need to reset their Mnesia database
+%% and then rejoin the cluster. They can't do the Mnesia upgrades as
+%% well and then merge databases since the cookie for each table will
+%% end up different and the merge will fail.
+%%
+%% This in turn means that we need to determine whether we are the
+%% primary or secondary upgrader *before* Mnesia comes up. If we
+%% didn't then the secondary upgrader would try to start Mnesia, and
+%% either hang waiting for a node which is not yet up, or fail since
+%% its schema differs from the other nodes in the cluster.
+%%
+%% Also, the primary upgrader needs to start Mnesia to do its
+%% upgrades, but needs to forcibly load tables rather than wait for
+%% them (in case it was not the last node to shut down, in which case
+%% it would wait forever).
+%%
+%% This in turn means that maybe_upgrade_mnesia/0 has to be patched
+%% into the boot process by prelaunch before the mnesia application is
+%% started. By the time Mnesia is started the upgrades have happened
+%% (on the primary), or Mnesia has been reset (on the secondary) and
+%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster
+%% in the normal way.
+%%
+%% The non-mnesia upgrades are then triggered by
+%% rabbit_mnesia:init_db/3. Of course, it's possible for a given
+%% upgrade process to only require Mnesia upgrades, or only require
+%% non-Mnesia upgrades. In the latter case no Mnesia resets and
+%% reclusterings occur.
+%%
+%% The primary upgrader needs to be a disc node. Ideally we would like
+%% it to be the last disc node to shut down (since otherwise there's a
+%% risk of data loss). On each node we therefore record the disc nodes
+%% that were still running when we shut down. A disc node that knows
+%% other nodes were up when it shut down, or a ram node, will refuse
+%% to be the primary upgrader, and will thus not start when upgrades
+%% are needed.
+%%
+%% However, this is racy if several nodes are shut down at once. Since
+%% rabbit records the running nodes, and shuts down before mnesia, the
+%% race manifests as all disc nodes thinking they are not the primary
+%% upgrader. Therefore the user can remove the record of the last disc
+%% node to shut down to get things going again. This may lose any
+%% mnesia changes that happened after the node chosen as the primary
+%% upgrader was shut down.
+
+%% -------------------------------------------------------------------
+
+ensure_backup_taken() ->
+ case filelib:is_file(lock_filename()) of
+ false -> case filelib:is_dir(backup_dir()) of
+ false -> ok = take_backup();
+ _ -> ok
+ end;
+ true -> throw({error, previous_upgrade_failed})
end.
-read_version() ->
- case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
- {error, _} = Err -> Err
+take_backup() ->
+ BackupDir = backup_dir(),
+ case rabbit_mnesia:copy_db(BackupDir) of
+ ok -> info("upgrades: Mnesia dir backed up to ~p~n",
+ [BackupDir]);
+ {error, E} -> throw({could_not_back_up_mnesia_dir, E})
end.
-write_version() ->
- ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
- ok.
+ensure_backup_removed() ->
+ case filelib:is_dir(backup_dir()) of
+ true -> ok = remove_backup();
+ _ -> ok
+ end.
-desired_version() ->
- with_upgrade_graph(fun (G) -> heads(G) end).
+remove_backup() ->
+ ok = rabbit_misc:recursive_delete([backup_dir()]),
+ info("upgrades: Mnesia backup removed~n", []).
-%% -------------------------------------------------------------------
+maybe_upgrade_mnesia() ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ case rabbit_version:upgrades_required(mnesia) of
+ {error, version_not_available} ->
+ case AllNodes of
+ [_] -> ok;
+ _ -> die("Cluster upgrade needed but upgrading from "
+ "< 2.1.1.~nUnfortunately you will need to "
+ "rebuild the cluster.", [])
+ end;
+ {error, _} = Err ->
+ throw(Err);
+ {ok, []} ->
+ ok;
+ {ok, Upgrades} ->
+ ensure_backup_taken(),
+ ok = case upgrade_mode(AllNodes) of
+ primary -> primary_upgrade(Upgrades, AllNodes);
+ secondary -> secondary_upgrade(AllNodes)
+ end
+ end.
-with_upgrade_graph(Fun) ->
- case rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2,
- rabbit_misc:all_module_attributes(rabbit_upgrade)) of
- {ok, G} -> try
- Fun(G)
- after
- true = digraph:delete(G)
- end;
- {error, {vertex, duplicate, StepName}} ->
- throw({error, {duplicate_upgrade_step, StepName}});
- {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
- throw({error, {dependency_on_unknown_upgrade_step, StepName}});
- {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
- throw({error, {cycle_in_upgrade_steps, StepNames}})
+upgrade_mode(AllNodes) ->
+ case nodes_running(AllNodes) of
+ [] ->
+ AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ case {is_disc_node(), AfterUs} of
+ {true, []} ->
+ primary;
+ {true, _} ->
+ Filename = rabbit_mnesia:running_nodes_filename(),
+ die("Cluster upgrade needed but other disc nodes shut "
+ "down after this one.~nPlease first start the last "
+ "disc node to shut down.~n~nNote: if several disc "
+ "nodes were shut down simultaneously they may "
+ "all~nshow this message. In which case, remove "
+ "the lock file on one of them and~nstart that node. "
+ "The lock file on this node is:~n~n ~s ", [Filename]);
+ {false, _} ->
+ die("Cluster upgrade needed but this is a ram node.~n"
+ "Please first start the last disc node to shut down.",
+ [])
+ end;
+ [Another|_] ->
+ MyVersion = rabbit_version:desired_for_scope(mnesia),
+ ErrFun = fun (ClusterVersion) ->
+ %% The other node(s) are running an
+ %% unexpected version.
+ die("Cluster upgrade needed but other nodes are "
+ "running ~p~nand I want ~p",
+ [ClusterVersion, MyVersion])
+ end,
+ case rpc:call(Another, rabbit_version, desired_for_scope,
+ [mnesia]) of
+ {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version);
+ {badrpc, Reason} -> ErrFun({unknown, Reason});
+ CV -> case rabbit_version:matches(
+ MyVersion, CV) of
+ true -> secondary;
+ false -> ErrFun(CV)
+ end
+ end
end.
-vertices(Module, Steps) ->
- [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+is_disc_node() ->
+ %% This is pretty ugly but we can't start Mnesia and ask it (will hang),
+ %% we can't look at the config file (may not include us even if we're a
+ %% disc node).
+ filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")).
+
+die(Msg, Args) ->
+ %% We don't throw or exit here since that gets thrown
+ %% straight out into do_boot, generating an erl_crash.dump
+ %% and displaying any error message in a confusing way.
+ error_logger:error_msg(Msg, Args),
+ io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ error_logger:logfile(close),
+ halt(1).
+
+primary_upgrade(Upgrades, Nodes) ->
+ Others = Nodes -- [node()],
+ ok = apply_upgrades(
+ mnesia,
+ Upgrades,
+ fun () ->
+ force_tables(),
+ case Others of
+ [] -> ok;
+ _ -> info("mnesia upgrades: Breaking cluster~n", []),
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node)
+ || Node <- Others]
+ end
+ end),
+ ok.
-edges(_Module, Steps) ->
- [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+force_tables() ->
+ [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()].
-unknown_heads(Heads, G) ->
- [H || H <- Heads, digraph:vertex(G, H) =:= false].
+secondary_upgrade(AllNodes) ->
+ %% must do this before we wipe out schema
+ IsDiscNode = is_disc_node(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ %% Note that we cluster with all nodes, rather than all disc nodes
+ %% (as we can't know all disc nodes at this point). This is safe as
+ %% we're not writing the cluster config, just setting up Mnesia.
+ ClusterNodes = case IsDiscNode of
+ true -> AllNodes;
+ false -> AllNodes -- [node()]
+ end,
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_version:record_desired_for_scope(mnesia),
+ ok.
-upgrades_to_apply(Heads, G) ->
- %% Take all the vertices which can reach the known heads. That's
- %% everything we've already applied. Subtract that from all
- %% vertices: that's what we have to apply.
- Unsorted = sets:to_list(
- sets:subtract(
- sets:from_list(digraph:vertices(G)),
- sets:from_list(digraph_utils:reaching(Heads, G)))),
- %% Form a subgraph from that list and find a topological ordering
- %% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName)) ||
- StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+nodes_running(Nodes) ->
+ [N || N <- Nodes, node_running(N)].
-heads(G) ->
- lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+node_running(Node) ->
+ case rpc:call(Node, application, which_applications, []) of
+ {badrpc, _} -> false;
+ Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
+ end.
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades) ->
- LockFile = lock_filename(dir()),
- case rabbit_misc:lock_file(LockFile) of
- ok ->
- BackupDir = dir() ++ "-upgrade-backup",
- info("Upgrades: ~w to apply~n", [length(Upgrades)]),
- case rabbit_mnesia:copy_db(BackupDir) of
- ok ->
- %% We need to make the backup after creating the
- %% lock file so that it protects us from trying to
- %% overwrite the backup. Unfortunately this means
- %% the lock file exists in the backup too, which
- %% is not intuitive. Remove it.
- ok = file:delete(lock_filename(BackupDir)),
- info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
- [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
- info("Upgrades: All upgrades applied successfully~n", []),
- ok = write_version(),
- ok = rabbit_misc:recursive_delete([BackupDir]),
- info("Upgrades: Mnesia backup removed~n", []),
- ok = file:delete(LockFile);
- {error, E} ->
- %% If we can't backup, the upgrade hasn't started
- %% hence we don't need the lockfile since the real
- %% mnesia dir is the good one.
- ok = file:delete(LockFile),
- throw({could_not_back_up_mnesia_dir, E})
- end;
- {error, eexist} ->
- throw({error, previous_upgrade_failed})
+maybe_upgrade_local() ->
+ case rabbit_version:upgrades_required(local) of
+ {error, version_not_available} -> version_not_available;
+ {error, _} = Err -> throw(Err);
+ {ok, []} -> ensure_backup_removed(),
+ ok;
+ {ok, Upgrades} -> mnesia:stop(),
+ ensure_backup_taken(),
+ ok = apply_upgrades(local, Upgrades,
+ fun () -> ok end),
+ ensure_backup_removed(),
+ ok
end.
-apply_upgrade({M, F}) ->
- info("Upgrades: Applying ~w:~w~n", [M, F]),
+%% -------------------------------------------------------------------
+
+apply_upgrades(Scope, Upgrades, Fun) ->
+ ok = rabbit_misc:lock_file(lock_filename()),
+ info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ Fun(),
+ [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades],
+ info("~s upgrades: All upgrades applied successfully~n", [Scope]),
+ ok = rabbit_version:record_desired_for_scope(Scope),
+ ok = file:delete(lock_filename()).
+
+apply_upgrade(Scope, {M, F}) ->
+ info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]),
ok = apply(M, F, []).
%% -------------------------------------------------------------------
dir() -> rabbit_mnesia:dir().
-schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
-
+lock_filename() -> lock_filename(dir()).
lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
+backup_dir() -> dir() ++ "-upgrade-backup".
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b9dbe418..7567c29e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -20,12 +20,12 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
--rabbit_upgrade({hash_passwords, []}).
--rabbit_upgrade({add_ip_to_listener, []}).
--rabbit_upgrade({internal_exchanges, []}).
--rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
--rabbit_upgrade({topic_trie, []}).
+-rabbit_upgrade({remove_user_scope, mnesia, []}).
+-rabbit_upgrade({hash_passwords, mnesia, []}).
+-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({internal_exchanges, mnesia, []}).
+-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
+-rabbit_upgrade({topic_trie, mnesia, []}).
%% -------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8c9d62a7..ff7252fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -150,10 +150,13 @@
%% responsive.
%%
%% In the queue we keep track of both messages that are pending
-%% delivery and messages that are pending acks. This ensures that
-%% purging (deleting the former) and deletion (deleting the former and
-%% the latter) are both cheap and do require any scanning through qi
-%% segments.
+%% delivery and messages that are pending acks. In the event of a
+%% queue purge, we only need to load qi segments if the queue has
+%% elements in deltas (i.e. it came under significant memory
+%% pressure). In the event of a queue deletion, in addition to the
+%% preceding, by keeping track of pending acks in RAM, we do not need
+%% to search through qi segments looking for messages that are yet to
+%% be acknowledged.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
@@ -298,7 +301,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({multiple_routing_keys, []}).
+-rabbit_upgrade({multiple_routing_keys, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
new file mode 100644
index 00000000..400abc10
--- /dev/null
+++ b/src/rabbit_version.erl
@@ -0,0 +1,172 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_version).
+
+-export([recorded/0, matches/2, desired/0, desired_for_scope/1,
+ record_desired/0, record_desired_for_scope/1,
+ upgrades_required/1]).
+
+%% -------------------------------------------------------------------
+-ifdef(use_specs).
+
+-export_type([scope/0, step/0]).
+
+-type(scope() :: atom()).
+-type(scope_version() :: [atom()]).
+-type(step() :: {atom(), atom()}).
+
+-type(version() :: [atom()]).
+
+-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
+-spec(matches/2 :: ([A], [A]) -> boolean()).
+-spec(desired/0 :: () -> version()).
+-spec(desired_for_scope/1 :: (scope()) -> scope_version()).
+-spec(record_desired/0 :: () -> 'ok').
+-spec(record_desired_for_scope/1 ::
+ (scope()) -> rabbit_types:ok_or_error(any())).
+-spec(upgrades_required/1 ::
+ (scope()) -> rabbit_types:ok_or_error2([step()], any())).
+
+-endif.
+%% -------------------------------------------------------------------
+
+-define(VERSION_FILENAME, "schema_version").
+-define(SCOPES, [mnesia, local]).
+
+%% -------------------------------------------------------------------
+
+recorded() -> case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [V]} -> {ok, V};
+ {error, _} = Err -> Err
+ end.
+
+record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]).
+
+recorded_for_scope(Scope) ->
+ case recorded() of
+ {error, _} = Err ->
+ Err;
+ {ok, Version} ->
+ {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of
+ false -> [];
+ {value, {Scope, SV1}} -> SV1
+ end}
+ end.
+
+record_for_scope(Scope, ScopeVersion) ->
+ case recorded() of
+ {error, _} = Err ->
+ Err;
+ {ok, Version} ->
+ Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version),
+ {Scope, ScopeVersion}),
+ ok = record([Name || {_Scope, Names} <- Version1, Name <- Names])
+ end.
+
+%% -------------------------------------------------------------------
+
+matches(VerA, VerB) ->
+ lists:usort(VerA) =:= lists:usort(VerB).
+
+%% -------------------------------------------------------------------
+
+desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)].
+
+desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope).
+
+record_desired() -> record(desired()).
+
+record_desired_for_scope(Scope) ->
+ record_for_scope(Scope, desired_for_scope(Scope)).
+
+upgrades_required(Scope) ->
+ case recorded_for_scope(Scope) of
+ {error, enoent} ->
+ {error, version_not_available};
+ {ok, CurrentHeads} ->
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> {ok, upgrades_to_apply(CurrentHeads, G)};
+ Unknown -> {error, {future_upgrades_found, Unknown}}
+ end
+ end, Scope)
+ end.
+
+%% -------------------------------------------------------------------
+
+with_upgrade_graph(Fun, Scope) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
+ fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
+
+vertices(Module, Steps, Scope0) ->
+ [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps,
+ Scope0 == Scope1].
+
+edges(_Module, Steps, Scope0) ->
+ [{Require, StepName} || {StepName, Scope1, Requires} <- Steps,
+ Require <- Requires,
+ Scope0 == Scope1].
+unknown_heads(Heads, G) ->
+ [H || H <- Heads, digraph:vertex(G, H) =:= false].
+
+upgrades_to_apply(Heads, G) ->
+ %% Take all the vertices which can reach the known heads. That's
+ %% everything we've already applied. Subtract that from all
+ %% vertices: that's what we have to apply.
+ Unsorted = sets:to_list(
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
+ %% Form a subgraph from that list and find a topological ordering
+ %% so we can invoke them in order.
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+
+heads(G) ->
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+
+%% -------------------------------------------------------------------
+
+categorise_by_scope(Version) when is_list(Version) ->
+ Categorised =
+ [{Scope, Name} || {_Module, Attributes} <-
+ rabbit_misc:all_module_attributes(rabbit_upgrade),
+ {Name, Scope, _Requires} <- Attributes,
+ lists:member(Name, Version)],
+ orddict:to_list(
+ lists:foldl(fun ({Scope, Name}, CatVersion) ->
+ rabbit_misc:orddict_cons(Scope, Name, CatVersion)
+ end, orddict:new(), Categorised)).
+
+dir() -> rabbit_mnesia:dir().
+
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).