diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-07-31 13:06:16 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-07-31 13:06:16 +0100 |
commit | 09b934a686384267afdb28fe3b4bba4d9bcca78c (patch) | |
tree | 97db40e02fbc1aa2043d545da2dda08e046e7096 | |
parent | ac666e08c5405aa0b4e27a7edaaaf05d60e1e55e (diff) | |
parent | d99108bf76d3ddb972683217ae3e3e62583d036c (diff) | |
download | rabbitmq-server-09b934a686384267afdb28fe3b4bba4d9bcca78c.tar.gz |
Refresh branch from stable
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 26 | ||||
-rw-r--r-- | packaging/standalone/src/rabbit_release.erl | 4 | ||||
-rw-r--r-- | src/delegate.erl | 89 | ||||
-rw-r--r-- | src/gm.erl | 29 | ||||
-rw-r--r-- | src/pmon.erl | 42 | ||||
-rw-r--r-- | src/priority_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 3 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_connection_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_intermediate_sup.erl | 39 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 36 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 17 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 97 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 3 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 11 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 25 | ||||
-rw-r--r-- | version.mk | 1 |
21 files changed, 316 insertions, 173 deletions
@@ -56,7 +56,8 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) -VERSION?=0.0.0 +include version.mk + PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) PLUGINS_DIR=plugins TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -262,6 +263,8 @@ srcdist: distclean cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) + echo "VERSION?=${VERSION}" > $(TARGET_SRC_DIR)/version.mk + cp -r scripts $(TARGET_SRC_DIR) cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 0f3c0faf..1d641144 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -405,6 +405,13 @@ must be offline, while the node we are removing from must be online, except when using the <command>--offline</command> flag. </para> + <para> + When using the <command>--offline</command> flag the node you + connect to will become the canonical source for cluster metadata + (e.g. which queues exist), even if it was not before. Therefore + you should use this command on the latest node to shut down if + at all possible. + </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen> <para role="example"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 339fa69e..a4582e2d 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -14,8 +14,7 @@ %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, - {env, [{hipe_compile, false}, - {tcp_listeners, [5672]}, + {env, [{tcp_listeners, [5672]}, {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, @@ -51,5 +50,24 @@ {backlog, 128}, {nodelay, true}, {linger, {true, 0}}, - {exit_on_close, false}]} - ]}]}. + {exit_on_close, false}]}, + {hipe_compile, false}, + %% see bug 24513 for how this list was created + {hipe_modules, + [rabbit_reader, rabbit_channel, gen_server2, rabbit_exchange, + rabbit_command_assembler, rabbit_framing_amqp_0_9_1, rabbit_basic, + rabbit_event, lists, queue, priority_queue, rabbit_router, + rabbit_trace, rabbit_misc, rabbit_binary_parser, + rabbit_exchange_type_direct, rabbit_guid, rabbit_net, + rabbit_amqqueue_process, rabbit_variable_queue, + rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, + sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, + rabbit_queue_index, rabbit_exchange_decorator, gen, dict, ordsets, + file_handle_cache, rabbit_msg_store, array, + rabbit_msg_store_ets_index, rabbit_msg_file, + rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, + pmon, ssl_connection, tls_connection, ssl_record, tls_record, + gen_fsm, ssl]}, + {ssl_apps, [asn1, crypto, public_key, ssl]} + ]}]}. diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl index dd68ee7e..f5e1ecf8 100644 --- a/packaging/standalone/src/rabbit_release.erl +++ b/packaging/standalone/src/rabbit_release.erl @@ -54,7 +54,9 @@ start() -> end, %% we need a list of ERTS apps we need to ship with rabbit - BaseApps = AllApps -- PluginAppNames, + {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), + + BaseApps = SslAppsConfig ++ AllApps -- PluginAppNames, AppVersions = [determine_version(App) || App <- BaseApps], RabbitVersion = proplists:get_value(rabbit, AppVersions), diff --git a/src/delegate.erl b/src/delegate.erl index 4e1dcd2e..7a06c1e4 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,15 +18,22 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2, + demonitor/1, demonitor/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {node, monitors, name}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([monitor_ref/0]). + +-type(monitor_ref() :: reference() | {atom(), pid()}). + -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). -spec(invoke/2 :: @@ -35,6 +42,10 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(monitor/2 :: ('process', pid()) -> monitor_ref()). +-spec(demonitor/1 :: (monitor_ref()) -> 'true'). +-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). + -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -50,7 +61,8 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). + Name = delegate_name(Num), + gen_server2:start_link({local, Name}, ?MODULE, [Name], []). invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> Fun(Pid); @@ -78,7 +90,7 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(RemoteNodes), + RemoteNodes, delegate(self(), RemoteNodes), {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || @@ -106,12 +118,27 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast( + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die ok. +monitor(Type, Pid) when node(Pid) =:= node() -> + erlang:monitor(Type, Pid); +monitor(Type, Pid) -> + Name = delegate(Pid, [node(Pid)]), + gen_server2:cast(Name, {monitor, Type, self(), Pid}), + {Name, Pid}. + +demonitor(Ref) -> ?MODULE:demonitor(Ref, []). + +demonitor(Ref, Options) when is_reference(Ref) -> + erlang:demonitor(Ref, Options); +demonitor({Name, Pid}, Options) -> + gen_server2:cast(Name, {demonitor, Pid, Options}). + call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -134,10 +161,10 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(RemoteNodes) -> +delegate(Pid, RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), + erlang:phash2(Pid, delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; @@ -155,22 +182,48 @@ safe_invoke(Pid, Fun) when is_pid(Pid) -> %%---------------------------------------------------------------------------- -init([]) -> - {ok, node(), hibernate, +init([Name]) -> + {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, Node) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. - -handle_cast({invoke, Fun, Grouped}, Node) -> +handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. + +handle_cast({monitor, Type, WantsMonitor, Pid}, + State = #state{monitors = Monitors}) -> + Ref = erlang:monitor(Type, Pid), + Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors), + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({demonitor, Pid, Options}, + State = #state{monitors = Monitors}) -> + {noreply, case dict:find(Pid, Monitors) of + {ok, {_WantsMonitor, Ref}} -> + erlang:demonitor(Ref, Options), + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; + +handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, Node, hibernate}. + {noreply, State, hibernate}. + +handle_info({'DOWN', Ref, process, Pid, Info}, + State = #state{monitors = Monitors, name = Name}) -> + {noreply, case dict:find(Pid, Monitors) of + {ok, {WantsMonitor, Ref}} -> + WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info}, + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; -handle_info(_Info, Node) -> - {noreply, Node, hibernate}. +handle_info(_Info, State) -> + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, Node, _Extra) -> - {ok, Node}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. @@ -81,6 +81,12 @@ %% Provide the Pid. Returns a proplist with various facts, including %% the group name and the current group members. %% +%% validate_members/2 +%% Check whether a given member list agrees with the chosen member's +%% view. Any differences will be communicated via the members_changed +%% callback. If there are no differences then there will be no reply. +%% Note that members will not necessarily share the same view. +%% %% forget_group/1 %% Provide the group name. Removes its mnesia record. Makes no attempt %% to ensure the group is empty. @@ -377,7 +383,7 @@ -behaviour(gen_server2). -export([create_tables/0, start_link/4, leave/1, broadcast/2, - confirmed_broadcast/2, info/1, forget_group/1]). + confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). @@ -438,6 +444,7 @@ -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(validate_members/2 :: (pid(), [pid()]) -> 'ok'). -spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return @@ -524,6 +531,9 @@ confirmed_broadcast(Server, Msg) -> info(Server) -> gen_server2:call(Server, info, infinity). +validate_members(Server, Members) -> + gen_server2:cast(Server, {validate_members, Members}). + forget_group(GroupName) -> {atomic, ok} = mnesia:sync_transaction( fun () -> @@ -659,6 +669,19 @@ handle_cast(join, State = #state { self = Self, handle_callback_result( {Module:joined(Args, get_pids(all_known_members(View))), State1}); +handle_cast({validate_members, OldMembers}, + State = #state { view = View, + module = Module, + callback_args = Args }) -> + NewMembers = get_pids(all_known_members(View)), + Births = NewMembers -- OldMembers, + Deaths = OldMembers -- NewMembers, + case {Births, Deaths} of + {[], []} -> noreply(State); + _ -> Result = Module:members_changed(Args, Births, Deaths), + handle_callback_result({Result, State}) + end; + handle_cast(leave, State) -> {stop, normal, State}. @@ -1053,7 +1076,7 @@ prune_or_create_group(Self, GroupName, TxnFun) -> fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = ?VERSION_START }, + version = get_version(Self) }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1294,6 +1317,8 @@ remove_erased_members(MembersState, View) -> MembersState1) end, blank_member_state(), all_known_members(View)). +get_version({Version, _Pid}) -> Version. + get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. diff --git a/src/pmon.erl b/src/pmon.erl index b9db66fb..86308167 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,22 +16,26 @@ -module(pmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). +-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2, + is_monitored/2, erase/2, monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). +-record(state, {dict, module}). + -ifdef(use_specs). %%---------------------------------------------------------------------------- -export_type([?MODULE/0]). --opaque(?MODULE() :: dict()). +-opaque(?MODULE() :: #state{dict :: dict(), + module :: atom()}). -type(item() :: pid() | {atom(), node()}). -spec(new/0 :: () -> ?MODULE()). +-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()). -spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). @@ -42,29 +46,33 @@ -endif. -new() -> dict:new(). +new() -> new(erlang). + +new(Module) -> #state{dict = dict:new(), + module = Module}. -monitor(Item, M) -> +monitor(Item, S = #state{dict = M, module = Module}) -> case dict:is_key(Item, M) of - true -> M; - false -> dict:store(Item, erlang:monitor(process, Item), M) + true -> S; + false -> S#state{dict = dict:store( + Item, Module:monitor(process, Item), M)} end. -monitor_all([], M) -> M; %% optimisation -monitor_all([Item], M) -> monitor(Item, M); %% optimisation -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). +monitor_all([], S) -> S; %% optimisation +monitor_all([Item], S) -> monitor(Item, S); %% optimisation +monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items). -demonitor(Item, M) -> +demonitor(Item, S = #state{dict = M, module = Module}) -> case dict:find(Item, M) of - {ok, MRef} -> erlang:demonitor(MRef), - dict:erase(Item, M); + {ok, MRef} -> Module:demonitor(MRef), + S#state{dict = dict:erase(Item, M)}; error -> M end. -is_monitored(Item, M) -> dict:is_key(Item, M). +is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M). -erase(Item, M) -> dict:erase(Item, M). +erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}. -monitored(M) -> dict:fetch_keys(M). +monitored(#state{dict = M}) -> dict:fetch_keys(M). -is_empty(M) -> dict:size(M) == 0. +is_empty(#state{dict = M}) -> dict:size(M) == 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 3d9e7c6a..6995c3be 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -51,7 +51,7 @@ -type(q() :: pqueue()). -type(priority() :: integer() | 'infinity'). --type(squeue() :: {queue, [any()], [any()]}). +-type(squeue() :: {queue, [any()], [any()], non_neg_integer()}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). diff --git a/src/rabbit.erl b/src/rabbit.erl index 46e3d0e4..eae3b802 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,22 +192,6 @@ -define(APPS, [os_mon, mnesia, rabbit]). -%% see bug 24513 for how this list was created --define(HIPE_WORTHY, - [rabbit_reader, rabbit_channel, gen_server2, - rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1, - rabbit_basic, rabbit_event, lists, queue, priority_queue, - rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser, - rabbit_exchange_type_direct, rabbit_guid, rabbit_net, - rabbit_amqqueue_process, rabbit_variable_queue, - rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, - sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, - rabbit_queue_index, gen, dict, ordsets, file_handle_cache, - rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, - rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, - ssl_connection, ssl_record, gen_fsm, ssl]). - %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In %% practice 2 processes seems just as fast as any other number > 1, @@ -281,7 +265,9 @@ warn_if_hipe_compilation_failed(false) -> %% long time, so make an exception to our no-stdout policy and display %% progress via stdout. hipe_compile() -> - Count = length(?HIPE_WORTHY), + {ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules), + HipeModules = [HM || HM <- HipeModulesAll, code:which(HM) =/= non_existing], + Count = length(HipeModules), io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), @@ -290,7 +276,7 @@ hipe_compile() -> io:format("#") end || M <- Ms] end) || - Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)], + Ms <- split(HipeModules, ?HIPE_PROCESSES)], [receive {'DOWN', MRef, process, _, normal} -> ok; {'DOWN', MRef, process, _, Reason} -> exit(Reason) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ebbe4bab..6e0eb9bf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -146,7 +146,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = queue:new(), - senders = pmon:new(), + senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). @@ -549,10 +549,8 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {{Message, Delivered, undefined}, true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); - {published, BQS1} -> - {true, State#q{backing_queue_state = BQS1}}; - {discarded, BQS1} -> - {true, discard(Delivery, State#q{backing_queue_state = BQS1})} + {true, BQS1} -> + {true, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 5739c7f3..a5b91867 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -181,8 +181,7 @@ partition_value(Partition) -> all_partitions(PartitionedWith) -> Nodes = rabbit_mnesia:cluster_nodes(all), Partitions = [{node(), PartitionedWith} | - [rpc:call(Node, rabbit_node_monitor, partitions, []) - || Node <- Nodes -- [node()]]], + rabbit_node_monitor:partitions(Nodes -- [node()])], all_partitions(Partitions, [Nodes]). all_partitions([], Partitions) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index f05e46e9..61b504bc 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -90,10 +90,7 @@ -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. +%% queue, but are not going to be further passed to BQ. -callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last @@ -216,11 +213,10 @@ -callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). %% Called prior to a publish or publish_delivered call. Allows the BQ -%% to signal that it's already seen this message (and in what capacity -%% - i.e. was it published previously or discarded previously) and -%% thus the message should be dropped. +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) and thus the message should be dropped. -callback is_duplicate(rabbit_types:basic_message(), state()) - -> {'false'|'published'|'discarded', state()}. + -> {boolean(), state()}. -else. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 94891629..fee377e7 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,11 +42,20 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + %% We need to get channels in the hierarchy here so they close + %% before the reader. But for 1.0 readers we can't start the real + %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - + %% so we add another supervisor into the hierarchy. + {ok, ChannelSup3Pid} = + supervisor2:start_child( + SupPid, + {channel_sup3, {rabbit_intermediate_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [SupPid, Collector, + [ChannelSup3Pid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_intermediate_sup.erl new file mode 100644 index 00000000..1919d9d6 --- /dev/null +++ b/src/rabbit_intermediate_sup.erl @@ -0,0 +1,39 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_intermediate_sup). + +-behaviour(supervisor2). + +-export([start_link/0]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 6791389e..3abd81f5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - BQS1 = BQ:discard(MsgId, ChPid, BQS), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQS1, - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + ensure_monitoring(ChPid, State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> @@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {published, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, confirmed} -> + {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, Disposition} + when Disposition =:= confirmed %% It got published when we were a slave via gm, and %% confirmed some time after that (maybe even after %% promotion), but before we received the publish from the @@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately after calling is_duplicate/2. - {published, State #state { seen_status = dict:erase(MsgId, SS), - confirmed = [MsgId | Confirmed] }}; - {ok, discarded} -> - %% Don't erase from SS here because discard/2 is about to - %% be called and we need to be able to detect this case - {discarded, State} + orelse Disposition =:= discarded -> + %% Message was discarded while we were a slave. Confirm now. + %% As above, amqqueue_process will have the entry for the + %% msg_id_to_channel mapping. + {true, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 38e0da3f..1996fd0a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -100,7 +100,7 @@ init(Q = #amqqueue { name = QName }) -> Node = node(), case rabbit_misc:execute_mnesia_transaction( fun() -> init_it(Self, GM, Node, QName) end) of - {new, QPid} -> + {new, QPid, GMPids} -> erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), @@ -120,13 +120,14 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), + ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -144,7 +145,7 @@ init_it(Self, GM, Node, QName) -> mnesia:read({rabbit_queue, QName}), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self, GM), - {new, QPid}; + {new, QPid, GMPids}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} @@ -156,7 +157,7 @@ init_it(Self, GM, Node, QName) -> gm_pids = [T || T = {_, S} <- GMPids, S =/= SPid] }, add_slave(Q1, Self, GM), - {new, QPid} + {new, QPid, GMPids} end end. @@ -273,7 +274,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(State); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -604,7 +606,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -612,8 +614,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ea9bc7d7..5fa29b7e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -56,7 +56,8 @@ %% Main interface -spec(init/0 :: () -> 'ok'). --spec(join_cluster/2 :: (node(), node_type()) -> 'ok'). +-spec(join_cluster/2 :: (node(), node_type()) + -> 'ok' | {'ok', 'already_member'}). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). @@ -164,23 +165,24 @@ join_cluster(DiscoveryNode, NodeType) -> {error, _} = E -> throw(E) end, case me_in_nodes(ClusterNodes) of - true -> e(already_clustered); - false -> ok - end, - - %% reset the node. this simplifies things and it will be needed in - %% this case - we're joining a new cluster with new nodes which - %% are not in synch with the current node. I also lifts the burden - %% of reseting the node from the user. - reset_gracefully(), - - %% Join the cluster - rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), - ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), - rabbit_node_monitor:notify_joined_cluster(), - - ok. + false -> + %% reset the node. this simplifies things and it will be needed in + %% this case - we're joining a new cluster with new nodes which + %% are not in synch with the current node. I also lifts the burden + %% of reseting the node from the user. + reset_gracefully(), + + %% Join the cluster + rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), + ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), + rabbit_node_monitor:notify_joined_cluster(), + ok; + true -> + rabbit_misc:local_info_msg("Already member of cluster: ~p~n", + [ClusterNodes]), + {ok, already_member} + end. %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no @@ -294,27 +296,18 @@ remove_node_offline_node(Node) -> %% this operation from disc nodes. case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of {[], disc} -> - %% Note that while we check if the nodes was the last to go down, - %% apart from the node we're removing from, this is still unsafe. - %% Consider the situation in which A and B are clustered. A goes - %% down, and records B as the running node. Then B gets clustered - %% with C, C goes down and B goes down. In this case, C is the - %% second-to-last, but we don't know that and we'll remove B from A - %% anyway, even if that will lead to bad things. - case cluster_nodes(running) -- [node(), Node] of - [] -> start_mnesia(), - try - %% What we want to do here is replace the last node to - %% go down with the current node. The way we do this - %% is by force loading the table, and making sure that - %% they are loaded. - rabbit_table:force_load(), - rabbit_table:wait_for_replicated(), - forget_cluster_node(Node, false) - after - stop_mnesia() - end; - _ -> e(not_last_node_to_go_down) + start_mnesia(), + try + %% What we want to do here is replace the last node to + %% go down with the current node. The way we do this + %% is by force loading the table, and making sure that + %% they are loaded. + rabbit_table:force_load(), + rabbit_table:wait_for_replicated(), + forget_cluster_node(Node, false), + force_load_next_boot() + after + stop_mnesia() end; {_, _} -> e(removing_node_from_offline_node) @@ -339,8 +332,7 @@ status() -> end. mnesia_partitions(Nodes) -> - {Replies, _BadNodes} = rpc:multicall( - Nodes, rabbit_node_monitor, partitions, []), + Replies = rabbit_node_monitor:partitions(Nodes), [Reply || Reply = {_, R} <- Replies, R =/= []]. is_running() -> mnesia:system_info(is_running) =:= yes. @@ -439,11 +431,13 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> ok = create_schema(); {[], true, disc} -> %% First disc node up + maybe_force_load(), ok; {[AnotherNode | _], _, _} -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), + maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) end, @@ -523,6 +517,19 @@ copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_file:recursive_copy(dir(), Destination). +force_load_filename() -> + filename:join(rabbit_mnesia:dir(), "force_load"). + +force_load_next_boot() -> + rabbit_file:write_file(force_load_filename(), <<"">>). + +maybe_force_load() -> + case rabbit_file:is_file(force_load_filename()) of + true -> rabbit_table:force_load(), + rabbit_file:delete(force_load_filename()); + false -> ok + end. + %% This does not guarantee us much, but it avoids some situations that %% will definitely end up badly check_cluster_consistency() -> @@ -853,10 +860,6 @@ error_description(clustering_only_disc_node) -> error_description(resetting_only_disc_node) -> "You cannot reset a node when it is the only disc node in a cluster. " "Please convert another node of the cluster to a disc node first."; -error_description(already_clustered) -> - "You are already clustered with the nodes you have selected. If the " - "node you are trying to cluster with is not present in the current " - "node status, use 'update_cluster_nodes'."; error_description(not_clustered) -> "Non-clustered nodes can only be disc nodes."; error_description(cannot_connect_to_cluster) -> @@ -879,10 +882,6 @@ error_description(offline_node_no_offline_flag) -> "You are trying to remove a node from an offline node. That is dangerous, " "but can be done with the --offline flag. Please consult the manual " "for rabbitmqctl for more information."; -error_description(not_last_node_to_go_down) -> - "The node you are trying to remove from was not the last to go down " - "(excluding the node you are removing). Please use the the last node " - "to go down to remove nodes when the cluster is offline."; error_description(removing_node_from_offline_node) -> "To remove a node remotely from an offline node, the node you are removing " "from must be a disc node and all the other nodes must be offline."; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 6ed6239c..46cfabe3 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -145,7 +145,8 @@ start() -> rabbit_sup:start_supervisor_child( {rabbit_connection_sup,start_link,[]}]). ensure_ssl() -> - ok = app_utils:start_applications([asn1, crypto, public_key, ssl]), + {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), + ok = app_utils:start_applications(SslAppsConfig), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), % unknown_ca errors are silently ignored prior to R14B unless we diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index c1de914f..805f1b2b 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -24,7 +24,7 @@ write_cluster_status/1, read_cluster_status/0, update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). --export([partitions/0, subscribe/1]). +-export([partitions/0, partitions/1, subscribe/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -57,7 +57,8 @@ -spec(notify_joined_cluster/0 :: () -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). --spec(partitions/0 :: () -> {node(), [node()]}). +-spec(partitions/0 :: () -> [node()]). +-spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]). -spec(subscribe/1 :: (pid()) -> 'ok'). -spec(all_rabbit_nodes_up/0 :: () -> boolean()). @@ -187,6 +188,10 @@ notify_left_cluster(Node) -> partitions() -> gen_server:call(?SERVER, partitions, infinity). +partitions(Nodes) -> + {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, infinity), + Replies. + subscribe(Pid) -> gen_server:cast(?SERVER, {subscribe, Pid}). @@ -208,7 +213,7 @@ init([]) -> autoheal = rabbit_autoheal:init()}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> - {reply, {node(), Partitions}, State}; + {reply, Partitions, State}; handle_call(_Request, _From, State) -> {noreply, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5e633f23..9b6039d1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, @@ -103,19 +103,19 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, +start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -201,7 +201,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of @@ -240,7 +240,7 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], @@ -756,6 +756,9 @@ refuse_connection(Sock, Exception, {A, B, C, D}) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), throw(Exception). +-ifdef(use_specs). +-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()). +-endif. refuse_connection(Sock, Exception) -> refuse_connection(Sock, Exception, {0, 0, 9, 1}). @@ -837,7 +840,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -847,7 +850,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Throttle1 = Throttle#throttle{conserve_resources = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( - ConnSupPid, + ChSup3Pid, {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( @@ -1048,9 +1051,9 @@ pack_for_1_0(#v1{parent = Parent, recv_len = RecvLen, pending_recv = PendingRecv, queue_collector = QueueCollector, - conn_sup_pid = ConnSupPid, + ch_sup3_pid = ChSup3Pid, start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, Buf, BufLen}. diff --git a/version.mk b/version.mk new file mode 100644 index 00000000..5683af4a --- /dev/null +++ b/version.mk @@ -0,0 +1 @@ +VERSION?=0.0.0 |