diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-28 17:41:52 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-28 17:41:52 +0100 |
commit | a8ff68e6b4a856778897d8a1c357a1bf6cfcb1f3 (patch) | |
tree | 908de780e19a373d309f4ede18f3e5b82ca5ceb7 | |
parent | b8f6018e0dc8ee6f5e67ee1d58fafa938185c42d (diff) | |
parent | dbe5902eebece9ef99c8688634fe32558a23a10c (diff) | |
download | rabbitmq-server-a8ff68e6b4a856778897d8a1c357a1bf6cfcb1f3.tar.gz |
Merge default into bug23968bug23968
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 5 | ||||
-rw-r--r-- | src/gm.erl | 134 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 82 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 23 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 127 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 47 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 331 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 12 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_version.erl | 172 |
18 files changed, 727 insertions, 245 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index ae9b2059..45af770a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,9 @@ done rm -rf %{buildroot} %changelog +* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1 +- New Upstream Release + * Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 12165dc0..2ca5074f 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.4.0-1) lucid; urgency=low + + * New Upstream Release + + -- Alexandru Scvortov <alexandru@rabbitmq.com> Tue, 22 Mar 2011 17:34:31 +0000 + rabbitmq-server (2.3.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 02da0cc6..45f5c5c4 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,10 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -# erlang-inets is not a strict dependency, but it's needed to allow -# the installation of plugins that use mochiweb. Ideally it would be a -# "Recommends" instead, but gdebi does not install those. -Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and @@ -376,15 +376,16 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_cast/2, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0]). +-export([table_definitions/0, flush/1]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(BROADCAST_TIMER, 25). -define(SETS, ordsets). -define(DICT, orddict). @@ -398,7 +399,9 @@ pub_count, members_state, callback_args, - confirms + confirms, + broadcast_buffer, + broadcast_timer }). -record(gm_group, { name, version, members }). @@ -508,21 +511,26 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). +flush(Server) -> + gen_server2:cast(Server, flush). + init([GroupName, Module, Args]) -> random:seed(now()), gen_server2:cast(self(), join), Self = self(), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = 0, - members_state = undefined, - callback_args = Args, - confirms = queue:new() }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = 0, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_timer = undefined }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -620,7 +628,11 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}. + {stop, normal, State}; + +handle_cast(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })). handle_info({'DOWN', MRef, process, _Pid, _Reason}, @@ -662,14 +674,17 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, end. -terminate(Reason, #state { module = Module, - callback_args = Args }) -> +terminate(Reason, State = #state { module = Module, + callback_args = Args }) -> + flush_broadcast_buffer(State), Module:terminate(Args, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. +prioritise_cast(flush, _State) -> 1; +prioritise_cast(_ , _State) -> 0. prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -782,33 +797,62 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, State, hibernate}. + {noreply, ensure_broadcast_timer(State), hibernate}. reply(Reply, State) -> - {reply, Reply, State, hibernate}. - -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - members_state = MembersState, - module = Module, - confirms = Confirms, - callback_args = Args }) -> - PubMsg = {PubCount, Msg}, - Activity = activity_cons(Self, [PubMsg], [], activity_nil()), - ok = maybe_send_activity(activity_finalise(Activity), State), - MembersState1 = - with_member( - fun (Member = #member { pending_ack = PA }) -> - Member #member { pending_ack = queue:in(PubMsg, PA) } - end, Self, MembersState), + {reply, Reply, ensure_broadcast_timer(State), hibernate}. + +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = undefined }) -> + State; +ensure_broadcast_timer(State = #state { broadcast_buffer = [], + broadcast_timer = TRef }) -> + timer:cancel(TRef), + State #state { broadcast_timer = undefined }; +ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> + {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + State #state { broadcast_timer = TRef }; +ensure_broadcast_timer(State) -> + State. + +internal_broadcast(Msg, From, State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer }) -> + Result = Module:handle_msg(Args, Self, Msg), + Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; _ -> queue:in({PubCount, From}, Confirms) end, - handle_callback_result({Module:handle_msg(Args, Self, Msg), - State #state { pub_count = PubCount + 1, - members_state = MembersState1, - confirms = Confirms1 }}). + State1 = State #state { pub_count = PubCount + 1, + confirms = Confirms1, + broadcast_buffer = Buffer1 }, + case From =/= none of + true -> + handle_callback_result({Result, flush_broadcast_buffer(State1)}); + false -> + handle_callback_result( + {Result, State1 #state { broadcast_buffer = Buffer1 }}) + end. + +flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> + State; +flush_broadcast_buffer(State = #state { self = Self, + members_state = MembersState, + broadcast_buffer = Buffer }) -> + Pubs = lists:reverse(Buffer), + Activity = activity_cons(Self, Pubs, [], activity_nil()), + ok = maybe_send_activity(activity_finalise(Activity), State), + MembersState1 = with_member( + fun (Member = #member { pending_ack = PA }) -> + PA1 = queue:join(PA, queue:from_list(Pubs)), + Member #member { pending_ack = PA1 } + end, Self, MembersState), + State #state { members_state = MembersState1, + broadcast_buffer = [] }. %% --------------------------------------------------------------------------- @@ -1093,16 +1137,22 @@ maybe_monitor(Self, Self) -> maybe_monitor(Other, _Self) -> erlang:monitor(process, Other). -check_neighbours(State = #state { self = Self, - left = Left, - right = Right, - view = View }) -> +check_neighbours(State = #state { self = Self, + left = Left, + right = Right, + view = View, + broadcast_buffer = Buffer }) -> #view_member { left = VLeft, right = VRight } = fetch_view_member(Self, View), Ver = view_version(View), Left1 = ensure_neighbour(Ver, Self, Left, VLeft), Right1 = ensure_neighbour(Ver, Self, Right, VRight), - State1 = State #state { left = Left1, right = Right1 }, + Buffer1 = case Right1 of + {Self, undefined} -> []; + _ -> Buffer + end, + State1 = State #state { left = Left1, right = Right1, + broadcast_buffer = Buffer1 }, ok = maybe_send_catchup(Right, State1), State1. diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl new file mode 100644 index 00000000..defb0f29 --- /dev/null +++ b/src/gm_speed_test.erl @@ -0,0 +1,82 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_speed_test). + +-export([test/3]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([wile_e_coyote/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +%% callbacks + +joined(Owner, _Members) -> + Owner ! joined, + ok. + +members_changed(_Owner, _Births, _Deaths) -> + ok. + +handle_msg(Owner, _From, ping) -> + Owner ! ping, + ok. + +terminate(Owner, _Reason) -> + Owner ! terminated, + ok. + +%% other + +wile_e_coyote(Time, WriteUnit) -> + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + receive joined -> ok end, + timer:sleep(1000), %% wait for all to join + timer:send_after(Time, stop), + Start = now(), + {Sent, Received} = loop(Pid, WriteUnit, 0, 0), + End = now(), + ok = gm:leave(Pid), + receive terminated -> ok end, + Elapsed = timer:now_diff(End, Start) / 1000000, + io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", + [Sent/Elapsed, Received/Elapsed]), + ok. + +loop(Pid, WriteUnit, Sent, Received) -> + case read(Received) of + {stop, Received1} -> {Sent, Received1}; + {ok, Received1} -> ok = write(Pid, WriteUnit), + loop(Pid, WriteUnit, Sent + WriteUnit, Received1) + end. + +read(Count) -> + receive + ping -> read(Count + 1); + stop -> {stop, Count} + after 5 -> + {ok, Count} + end. + +write(_Pid, 0) -> ok; +write(Pid, N) -> ok = gm:broadcast(Pid, ping), + write(Pid, N - 1). + +test(Time, WriteUnit, Nodes) -> + ok = gm:create_tables(), + [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes]. diff --git a/src/rabbit.erl b/src/rabbit.erl index c9a929ae..807e9e7d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,7 +192,8 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(). + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try @@ -233,6 +234,7 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> + ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), @@ -245,6 +247,7 @@ start(normal, []) -> end. stop(_State) -> + ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7ddb7814..6167790e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,17 +331,18 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read({rabbit_exchange, XName}) of - [] -> - add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); - [X] -> - add_deletion(XName, {X, not_deleted, Bindings}, - case rabbit_exchange:maybe_auto_delete(X) of - not_deleted -> Deletions; - {deleted, Deletions1} -> combine_deletions( - Deletions, Deletions1) - end) - end. + {Entry, Deletions1} = + case mnesia:read({rabbit_exchange, XName}) of + [] -> {{undefined, not_deleted, Bindings}, Deletions}; + [X] -> case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> + {{X, not_deleted, Bindings}, Deletions}; + {deleted, Deletions2} -> + {{X, deleted, Bindings}, + combine_deletions(Deletions, Deletions2)} + end + end, + add_deletion(XName, Entry, Deletions1). delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c12614c..5099bf3f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -156,7 +156,6 @@ ready_for_close(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> - process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 0120f0d6..3fb0817a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -67,8 +67,12 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> + %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's + %% second resolution, not millisecond. + Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, - #'P_basic'{content_type = <<"text/plain">>}, + #'P_basic'{content_type = <<"text/plain">>, + timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1..2e9563cf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -871,4 +871,3 @@ is_process_alive(Pid) -> true -> true; _ -> false end. - diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 66436920..fbcf07ae 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,9 +18,12 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0, + record_running_nodes/0, read_previously_running_nodes/0, + delete_previously_running_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -42,6 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). +-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -55,6 +59,12 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(record_running_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_nodes/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -78,9 +88,10 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> - ok = ensure_mnesia_running(), - ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true), + ensure_mnesia_running(), + ensure_mnesia_dir(), + ok = init_db(read_cluster_nodes_config(), true, + fun maybe_upgrade_local_or_record_desired/0), ok. is_db_empty() -> @@ -98,11 +109,12 @@ force_cluster(ClusterNodes) -> %% node. If Force is false, only connections to online nodes are %% allowed. cluster(ClusterNodes, Force) -> - ok = ensure_mnesia_not_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_not_running(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force), + ok = init_db(ClusterNodes, Force, + fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -367,11 +379,40 @@ delete_cluster_nodes_config() -> FileName, Reason}}) end. +running_nodes_filename() -> + filename:join(dir(), "nodes_running_at_shutdown"). + +record_running_nodes() -> + FileName = running_nodes_filename(), + Nodes = running_clustered_nodes() -- [node()], + %% Don't check the result: we're shutting down anyway and this is + %% a best-effort-basis. + rabbit_misc:write_term_file(FileName, [Nodes]), + ok. + +read_previously_running_nodes() -> + FileName = running_nodes_filename(), + case rabbit_misc:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +delete_previously_running_nodes() -> + FileName = running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force) -> +init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -387,26 +428,21 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade - case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_integrity(); - version_not_available -> schema_ok_or_move() - end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_integrity(); - {[], false, _} -> + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[], true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ensure_schema_integrity(); + version_not_available -> ok = schema_ok_or_move() + end, + ok; + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + rpc:call(AnotherNode, rabbit_version, recorded, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -415,7 +451,9 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_integrity() + ok = SecondaryPostMnesiaFun(), + ensure_schema_integrity(), + ok end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -424,6 +462,14 @@ init_db(ClusterNodes, Force) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. +maybe_upgrade_local_or_record_desired() -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end. + schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -440,13 +486,13 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - case rabbit_upgrade:desired_version() of - DiscVersion -> ok; - DesiredVersion -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + DesiredVersion = rabbit_version:desired(), + case rabbit_version:matches(DesiredVersion, DiscVersion) of + true -> ok; + false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:record_desired(). create_schema() -> mnesia:stop(), @@ -455,8 +501,8 @@ create_schema() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = create_tables(), - ok = ensure_schema_integrity(), - ok = rabbit_upgrade:write_version(). + ensure_schema_integrity(), + ok = rabbit_version:record_desired(). move_db() -> mnesia:stop(), @@ -476,18 +522,13 @@ move_db() -> {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) end, - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); - {error, E} -> - {error, E} - end. + ok = ensure_mnesia_not_running(), + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> @@ -561,12 +602,12 @@ wait_for_tables(TableNames) -> end. reset(Force) -> - ok = ensure_mnesia_not_running(), + ensure_mnesia_not_running(), Node = node(), case Force of true -> ok; false -> - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = try diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bc68d2cd..e3aae572 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -820,15 +820,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State1 = case CurHdl of undefined -> State; _ -> State2 = internal_sync(State), - file_handle_cache:close(CurHdl), + ok = file_handle_cache:close(CurHdl), State2 end, State3 = close_all_handles(State1), - store_file_summary(FileSummaryEts, Dir), - [ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], + ok = store_file_summary(FileSummaryEts, Dir), + [true = ets:delete(T) || + T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, - {index_module, IndexModule}], Dir), + ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, + {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -881,13 +882,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) - end, + ok = case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) + end, [K() || K <- lists:reverse(Syncs)], - [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs], - State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }. + State2 = lists:foldl( + fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs), + State2 #msstate { on_sync = [] }. write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; @@ -1384,7 +1388,7 @@ recover_file_summary(false, _Dir) -> recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> ok = file:delete(Path), {true, Tid}; {error, _Error} -> recover_file_summary(false, Dir) end. @@ -1451,9 +1455,7 @@ scan_file_for_valid_messages(Dir, FileName) -> Hdl, filelib:file_size( form_filename(Dir, FileName)), fun scan_fun/2, []), - %% if something really bad has happened, - %% the close could fail, but ignore - file_handle_cache:close(Hdl), + ok = file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, [], 0}; {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} @@ -1889,32 +1891,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), - file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)), recover_crashed_compactions(BaseDir), ok. foreach_file(D, Fun, Files) -> - [Fun(filename:join(D, File)) || File <- Files]. + [ok = Fun(filename:join(D, File)) || File <- Files]. foreach_file(D1, D2, Fun, Files) -> - [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. + [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. transform_dir(BaseDir, Store, TransformFun) -> Dir = filename:join(BaseDir, atom_to_list(Store)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, + CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end, case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), foreach_file(Dir, TmpDir, TransformFile, FileList), foreach_file(Dir, fun file:delete/1, FileList), - foreach_file(TmpDir, Dir, fun file:copy/2, FileList), + foreach_file(TmpDir, Dir, CopyFile, FileList), foreach_file(TmpDir, fun file:delete/1, FileList), ok = file:del_dir(TmpDir) end. transform_msg_file(FileOld, FileNew, TransformFun) -> - rabbit_misc:ensure_parent_dirs_exist(FileNew), + ok = rabbit_misc:ensure_parent_dirs_exist(FileNew), {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, @@ -1927,6 +1930,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), - file_handle_cache:close(RefOld), - file_handle_cache:close(RefNew), + ok = file_handle_cache:close(RefOld), + ok = file_handle_cache:close(RefNew), ok. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 7bb8c0ea..8800e8d6 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,7 +235,7 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 33c5391b..367953b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -169,7 +169,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ebda5d03..a2abb1e5 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). -include("rabbit.hrl"). @@ -27,141 +27,260 @@ -ifdef(use_specs). --type(step() :: atom()). --type(version() :: [step()]). - --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> version()). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). -endif. %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. -maybe_upgrade() -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); - {error, enoent} -> - version_not_available +%% The upgrade logic is quite involved, due to the existence of +%% clusters. +%% +%% Firstly, we have two different types of upgrades to do: Mnesia and +%% everythinq else. Mnesia upgrades must only be done by one node in +%% the cluster (we treat a non-clustered node as a single-node +%% cluster). This is the primary upgrader. The other upgrades need to +%% be done by all nodes. +%% +%% The primary upgrader has to start first (and do its Mnesia +%% upgrades). Secondary upgraders need to reset their Mnesia database +%% and then rejoin the cluster. They can't do the Mnesia upgrades as +%% well and then merge databases since the cookie for each table will +%% end up different and the merge will fail. +%% +%% This in turn means that we need to determine whether we are the +%% primary or secondary upgrader *before* Mnesia comes up. If we +%% didn't then the secondary upgrader would try to start Mnesia, and +%% either hang waiting for a node which is not yet up, or fail since +%% its schema differs from the other nodes in the cluster. +%% +%% Also, the primary upgrader needs to start Mnesia to do its +%% upgrades, but needs to forcibly load tables rather than wait for +%% them (in case it was not the last node to shut down, in which case +%% it would wait forever). +%% +%% This in turn means that maybe_upgrade_mnesia/0 has to be patched +%% into the boot process by prelaunch before the mnesia application is +%% started. By the time Mnesia is started the upgrades have happened +%% (on the primary), or Mnesia has been reset (on the secondary) and +%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster +%% in the normal way. +%% +%% The non-mnesia upgrades are then triggered by +%% rabbit_mnesia:init_db/3. Of course, it's possible for a given +%% upgrade process to only require Mnesia upgrades, or only require +%% non-Mnesia upgrades. In the latter case no Mnesia resets and +%% reclusterings occur. +%% +%% The primary upgrader needs to be a disc node. Ideally we would like +%% it to be the last disc node to shut down (since otherwise there's a +%% risk of data loss). On each node we therefore record the disc nodes +%% that were still running when we shut down. A disc node that knows +%% other nodes were up when it shut down, or a ram node, will refuse +%% to be the primary upgrader, and will thus not start when upgrades +%% are needed. +%% +%% However, this is racy if several nodes are shut down at once. Since +%% rabbit records the running nodes, and shuts down before mnesia, the +%% race manifests as all disc nodes thinking they are not the primary +%% upgrader. Therefore the user can remove the record of the last disc +%% node to shut down to get things going again. This may lose any +%% mnesia changes that happened after the node chosen as the primary +%% upgrader was shut down. + +%% ------------------------------------------------------------------- + +ensure_backup_taken() -> + case filelib:is_file(lock_filename()) of + false -> case filelib:is_dir(backup_dir()) of + false -> ok = take_backup(); + _ -> ok + end; + true -> throw({error, previous_upgrade_failed}) end. -read_version() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; - {error, _} = Err -> Err +take_backup() -> + BackupDir = backup_dir(), + case rabbit_mnesia:copy_db(BackupDir) of + ok -> info("upgrades: Mnesia dir backed up to ~p~n", + [BackupDir]); + {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. -write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), - ok. +ensure_backup_removed() -> + case filelib:is_dir(backup_dir()) of + true -> ok = remove_backup(); + _ -> ok + end. -desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). +remove_backup() -> + ok = rabbit_misc:recursive_delete([backup_dir()]), + info("upgrades: Mnesia backup removed~n", []). -%% ------------------------------------------------------------------- +maybe_upgrade_mnesia() -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), + case rabbit_version:upgrades_required(mnesia) of + {error, version_not_available} -> + case AllNodes of + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~nUnfortunately you will need to " + "rebuild the cluster.", []) + end; + {error, _} = Err -> + throw(Err); + {ok, []} -> + ok; + {ok, Upgrades} -> + ensure_backup_taken(), + ok = case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end + end. -with_upgrade_graph(Fun) -> - case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, - rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> try - Fun(G) - after - true = digraph:delete(G) - end; - {error, {vertex, duplicate, StepName}} -> - throw({error, {duplicate_upgrade_step, StepName}}); - {error, {edge, {bad_vertex, StepName}, _From, _To}} -> - throw({error, {dependency_on_unknown_upgrade_step, StepName}}); - {error, {edge, {bad_edge, StepNames}, _From, _To}} -> - throw({error, {cycle_in_upgrade_steps, StepNames}}) +upgrade_mode(AllNodes) -> + case nodes_running(AllNodes) of + [] -> + AfterUs = rabbit_mnesia:read_previously_running_nodes(), + case {is_disc_node(), AfterUs} of + {true, []} -> + primary; + {true, _} -> + Filename = rabbit_mnesia:running_nodes_filename(), + die("Cluster upgrade needed but other disc nodes shut " + "down after this one.~nPlease first start the last " + "disc node to shut down.~n~nNote: if several disc " + "nodes were shut down simultaneously they may " + "all~nshow this message. In which case, remove " + "the lock file on one of them and~nstart that node. " + "The lock file on this node is:~n~n ~s ", [Filename]); + {false, _} -> + die("Cluster upgrade needed but this is a ram node.~n" + "Please first start the last disc node to shut down.", + []) + end; + [Another|_] -> + MyVersion = rabbit_version:desired_for_scope(mnesia), + ErrFun = fun (ClusterVersion) -> + %% The other node(s) are running an + %% unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end, + case rpc:call(Another, rabbit_version, desired_for_scope, + [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); + {badrpc, Reason} -> ErrFun({unknown, Reason}); + CV -> case rabbit_version:matches( + MyVersion, CV) of + true -> secondary; + false -> ErrFun(CV) + end + end end. -vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. +is_disc_node() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will hang), + %% we can't look at the config file (may not include us even if we're a + %% disc node). + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], + ok = apply_upgrades( + mnesia, + Upgrades, + fun () -> + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), + ok. -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -unknown_heads(Heads, G) -> - [H || H <- Heads, digraph:vertex(G, H) =:= false]. +secondary_upgrade(AllNodes) -> + %% must do this before we wipe out schema + IsDiscNode = is_disc_node(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + %% Note that we cluster with all nodes, rather than all disc nodes + %% (as we can't know all disc nodes at this point). This is safe as + %% we're not writing the cluster config, just setting up Mnesia. + ClusterNodes = case IsDiscNode of + true -> AllNodes; + false -> AllNodes -- [node()] + end, + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_version:record_desired_for_scope(mnesia), + ok. -upgrades_to_apply(Heads, G) -> - %% Take all the vertices which can reach the known heads. That's - %% everything we've already applied. Subtract that from all - %% vertices: that's what we have to apply. - Unsorted = sets:to_list( - sets:subtract( - sets:from_list(digraph:vertices(G)), - sets:from_list(digraph_utils:reaching(Heads, G)))), - %% Form a subgraph from that list and find a topological ordering - %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. -heads(G) -> - lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> - LockFile = lock_filename(dir()), - case rabbit_misc:lock_file(LockFile) of - ok -> - BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), - case rabbit_mnesia:copy_db(BackupDir) of - ok -> - %% We need to make the backup after creating the - %% lock file so that it protects us from trying to - %% overwrite the backup. Unfortunately this means - %% the lock file exists in the backup too, which - %% is not intuitive. Remove it. - ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), - ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), - ok = file:delete(LockFile); - {error, E} -> - %% If we can't backup, the upgrade hasn't started - %% hence we don't need the lockfile since the real - %% mnesia dir is the good one. - ok = file:delete(LockFile), - throw({could_not_back_up_mnesia_dir, E}) - end; - {error, eexist} -> - throw({error, previous_upgrade_failed}) +maybe_upgrade_local() -> + case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> ensure_backup_removed(), + ok; + {ok, Upgrades} -> mnesia:stop(), + ensure_backup_taken(), + ok = apply_upgrades(local, Upgrades, + fun () -> ok end), + ensure_backup_removed(), + ok end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +%% ------------------------------------------------------------------- + +apply_upgrades(Scope, Upgrades, Fun) -> + ok = rabbit_misc:lock_file(lock_filename()), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", [Scope]), + ok = rabbit_version:record_desired_for_scope(Scope), + ok = file:delete(lock_filename()). + +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- dir() -> rabbit_mnesia:dir(). -schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). - +lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). +backup_dir() -> dir() ++ "-upgrade-backup". %% NB: we cannot use rabbit_log here since it may not have been %% started yet diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9dbe418..7567c29e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -20,12 +20,12 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, [hash_passwords]}). --rabbit_upgrade({topic_trie, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). +-rabbit_upgrade({topic_trie, mnesia, []}). %% ------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8c9d62a7..ff7252fd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -150,10 +150,13 @@ %% responsive. %% %% In the queue we keep track of both messages that are pending -%% delivery and messages that are pending acks. This ensures that -%% purging (deleting the former) and deletion (deleting the former and -%% the latter) are both cheap and do require any scanning through qi -%% segments. +%% delivery and messages that are pending acks. In the event of a +%% queue purge, we only need to load qi segments if the queue has +%% elements in deltas (i.e. it came under significant memory +%% pressure). In the event of a queue deletion, in addition to the +%% preceding, by keeping track of pending acks in RAM, we do not need +%% to search through qi segments looking for messages that are yet to +%% be acknowledged. %% %% Pending acks are recorded in memory either as the tuple {SeqId, %% MsgId, MsgProps} (tuple-form) or as the message itself (message- @@ -298,7 +301,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). +-rabbit_upgrade({multiple_routing_keys, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl new file mode 100644 index 00000000..400abc10 --- /dev/null +++ b/src/rabbit_version.erl @@ -0,0 +1,172 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_version). + +-export([recorded/0, matches/2, desired/0, desired_for_scope/1, + record_desired/0, record_desired_for_scope/1, + upgrades_required/1]). + +%% ------------------------------------------------------------------- +-ifdef(use_specs). + +-export_type([scope/0, step/0]). + +-type(scope() :: atom()). +-type(scope_version() :: [atom()]). +-type(step() :: {atom(), atom()}). + +-type(version() :: [atom()]). + +-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec(matches/2 :: ([A], [A]) -> boolean()). +-spec(desired/0 :: () -> version()). +-spec(desired_for_scope/1 :: (scope()) -> scope_version()). +-spec(record_desired/0 :: () -> 'ok'). +-spec(record_desired_for_scope/1 :: + (scope()) -> rabbit_types:ok_or_error(any())). +-spec(upgrades_required/1 :: + (scope()) -> rabbit_types:ok_or_error2([step()], any())). + +-endif. +%% ------------------------------------------------------------------- + +-define(VERSION_FILENAME, "schema_version"). +-define(SCOPES, [mnesia, local]). + +%% ------------------------------------------------------------------- + +recorded() -> case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, V}; + {error, _} = Err -> Err + end. + +record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). + +recorded_for_scope(Scope) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of + false -> []; + {value, {Scope, SV1}} -> SV1 + end} + end. + +record_for_scope(Scope, ScopeVersion) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version), + {Scope, ScopeVersion}), + ok = record([Name || {_Scope, Names} <- Version1, Name <- Names]) + end. + +%% ------------------------------------------------------------------- + +matches(VerA, VerB) -> + lists:usort(VerA) =:= lists:usort(VerB). + +%% ------------------------------------------------------------------- + +desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. + +desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope). + +record_desired() -> record(desired()). + +record_desired_for_scope(Scope) -> + record_for_scope(Scope, desired_for_scope(Scope)). + +upgrades_required(Scope) -> + case recorded_for_scope(Scope) of + {error, enoent} -> + {error, version_not_available}; + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> {ok, upgrades_to_apply(CurrentHeads, G)}; + Unknown -> {error, {future_upgrades_found, Unknown}} + end + end, Scope) + end. + +%% ------------------------------------------------------------------- + +with_upgrade_graph(Fun, Scope) -> + case rabbit_misc:build_acyclic_graph( + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. + +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. + +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +categorise_by_scope(Version) when is_list(Version) -> + Categorised = + [{Scope, Name} || {_Module, Attributes} <- + rabbit_misc:all_module_attributes(rabbit_upgrade), + {Name, Scope, _Requires} <- Attributes, + lists:member(Name, Version)], + orddict:to_list( + lists:foldl(fun ({Scope, Name}, CatVersion) -> + rabbit_misc:orddict_cons(Scope, Name, CatVersion) + end, orddict:new(), Categorised)). + +dir() -> rabbit_mnesia:dir(). + +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). |