diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-11 18:48:23 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-11 18:48:23 +0100 |
commit | 0725324ad4c4bd5648feff9c9f704388b2c6d067 (patch) | |
tree | cfa98020569de6b5fd087e1ac1f14cb6ba4482c6 | |
parent | 5757824244675fb7d0c71dcb464da8e8536c8013 (diff) | |
parent | f5ce49652d4e5c143e356969e9b7bc9e665e617d (diff) | |
download | rabbitmq-server-0725324ad4c4bd5648feff9c9f704388b2c6d067.tar.gz |
merge from default
32 files changed, 992 insertions, 346 deletions
@@ -20,6 +20,8 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) +QC_MODULES := rabbit_backing_queue_qc +QC_TRIALS ?= 100 ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -45,8 +47,14 @@ ifndef USE_SPECS USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().') endif +ifndef USE_PROPER_QC +# PropEr needs to be installed for property checking +# http://proper.softlab.ntua.gr/ +USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:ensure_loaded(proper)), halt().') +endif + #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -69,6 +77,10 @@ define usage_dep $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl endef +define boolean_macro +$(if $(filter true,$(1)),-D$(2)) +endef + ifneq "$(SBIN_DIR)" "" ifneq "$(TARGET_DIR)" "" SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) @@ -165,6 +177,9 @@ run-tests: all OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \ echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null +run-qc: all + $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) + start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ @@ -223,7 +238,7 @@ srcdist: distclean chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) - (cd dist; zip -r $(TARBALL_NAME).zip $(TARBALL_NAME)) + (cd dist; zip -q -r $(TARBALL_NAME).zip $(TARBALL_NAME)) rm -rf $(TARGET_SRC_DIR) distclean: clean @@ -314,3 +329,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) endif +.PHONY: run-qc diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index ffc826eb..11f5f01c 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,9 @@ done rm -rf %{buildroot} %changelog +* Mon Jun 27 2011 simon@rabbitmq.com 2.5.1-1 +- New Upstream Release + * Thu Jun 9 2011 jerryk@vmware.com 2.5.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 31979a8e..38c81134 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -19,7 +19,7 @@ all: package: clean cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) - tar -zxvf $(DEBIAN_ORIG_TARBALL) + tar -zxf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ # Debian and descendants differ from most other distros in that diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 1cab4235..9063a6ed 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.5.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Mon, 27 Jun 2011 11:21:49 +0100 + rabbitmq-server (2.5.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index c4e01f4a..b5c342aa 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile index 59803f9c..ab50e30b 100644 --- a/packaging/windows-exe/Makefile +++ b/packaging/windows-exe/Makefile @@ -2,7 +2,7 @@ VERSION=0.0.0 ZIP=../windows/rabbitmq-server-windows-$(VERSION) dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION) - makensis rabbitmq-$(VERSION).nsi + makensis -V2 rabbitmq-$(VERSION).nsi rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in sed \ @@ -10,7 +10,7 @@ rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in $< > $@ rabbitmq_server-$(VERSION): - unzip $(ZIP) + unzip -q $(ZIP) clean: rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 1ed4064e..27e4e1dc 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -113,17 +113,17 @@ Section "Start Menu" RabbitStartMenu CreateDirectory "$APPDATA\RabbitMQ\db" CreateDirectory "$SMPROGRAMS\RabbitMQ Server" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall RabbitMQ.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Plugins.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Logs.lnk" "$APPDATA\RabbitMQ\log" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Database Directory.lnk" "$APPDATA\RabbitMQ\db" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - (re)install.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - remove.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - start.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - stop.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" SetOutPath "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe" SetOutPath $INSTDIR SectionEnd diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index dacfa620..a0be8d89 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin @@ -24,7 +24,7 @@ dist: elinks -dump -no-references -no-numbering rabbitmq-service.html \ > $(TARGET_DIR)/readme-service.txt todos $(TARGET_DIR)/readme-service.txt - zip -r $(TARGET_ZIP).zip $(TARGET_DIR) + zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR) rm -rf $(TARGET_DIR) rabbitmq-service.html clean: clean_partial diff --git a/quickcheck b/quickcheck new file mode 100755 index 00000000..a36cf3ed --- /dev/null +++ b/quickcheck @@ -0,0 +1,36 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -sname quickcheck +-mode(compile). + +%% A helper to test quickcheck properties on a running broker +%% NodeStr is a local broker node name +%% ModStr is the module containing quickcheck properties +%% The number of trials is optional +main([NodeStr, ModStr | TrialsStr]) -> + {ok, Hostname} = inet:gethostname(), + Node = list_to_atom(NodeStr ++ "@" ++ Hostname), + Mod = list_to_atom(ModStr), + Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr), + case rpc:call(Node, code, ensure_loaded, [proper]) of + {module, proper} -> + case rpc:call(Node, proper, module, [Mod] ++ Trials) of + [] -> ok; + _ -> quit(1) + end; + {badrpc, Reason} -> + io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), + quit(2); + {error,nofile} -> + io:format("Module PropEr was not found on node ~p~n", [Node]), + quit(2) + end; +main([]) -> + io:format("This script requires a node name and a module.~n"). + +quit(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status) + end. + diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 61b08d49..235e14c0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -925,10 +925,10 @@ handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}; + update_counts(obtain, FromPid, -1, State)))}. -handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. +handle_info(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { elders = Elders, @@ -1133,9 +1133,9 @@ reduce(State = #fhc_state { open_pending = OpenPending, end end, case TRef of - undefined -> {ok, TRef1} = timer:apply_after( - ?FILE_HANDLES_CHECK_INTERVAL, - gen_server, cast, [?SERVER, check_counts]), + undefined -> TRef1 = erlang:send_after( + ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER, + check_counts), State #fhc_state { timer_ref = TRef1 }; _ -> State end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 43e0a8f5..35258139 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -67,6 +67,11 @@ %% module. Note there is no form also encompassing a reply, thus if %% you wish to reply in handle_call/3 and change the callback module, %% you need to use gen_server2:reply/2 to issue the reply manually. +%% +%% 8) The callback module can optionally implement +%% format_message_queue/2 which is the equivalent of format_status/2 +%% but where the second argument is specifically the priority_queue +%% which contains the prioritised message_queue. %% All modifications are (C) 2009-2011 VMware, Inc. @@ -593,41 +598,35 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, - queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - {'$gen_cast', Msg}, - PC(Msg, GS2State), Queue) }; -in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, - queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - {'$gen_call', From, Msg}, - PC(Msg, From, GS2State), Queue) }; -in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - Input, PI(Input, GS2State), Queue) }. - -process_msg(Msg, - GS2State = #gs2_state { parent = Parent, - name = Name, - debug = Debug }) -> - case Msg of - {system, From, Req} -> - sys:handle_system_msg( - Req, From, Parent, ?MODULE, Debug, - GS2State); - %% gen_server puts Hib on the end as the 7th arg, but that - %% version of the function seems not to be documented so - %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Msg, GS2State); - _Msg when Debug =:= [] -> - handle_msg(Msg, GS2State); - _Msg -> - Debug1 = sys:handle_debug(Debug, fun print_event/3, - Name, {in, Msg}), - handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) - end. +in({'$gen_cast', Msg} = Input, + GS2State = #gs2_state { prioritise_cast = PC }) -> + in(Input, PC(Msg, GS2State), GS2State); +in({'$gen_call', From, Msg} = Input, + GS2State = #gs2_state { prioritise_call = PC }) -> + in(Input, PC(Msg, From, GS2State), GS2State); +in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> + in(Input, infinity, GS2State); +in({system, _From, _Req} = Input, GS2State) -> + in(Input, infinity, GS2State); +in(Input, GS2State = #gs2_state { prioritise_info = PI }) -> + in(Input, PI(Input, GS2State), GS2State). + +in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> + GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. + +process_msg({system, From, Req}, + GS2State = #gs2_state { parent = Parent, debug = Debug }) -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); +process_msg({'EXIT', Parent, Reason} = Msg, + GS2State = #gs2_state { parent = Parent }) -> + %% gen_server puts Hib on the end as the 7th arg, but that version + %% of the fun seems not to be documented so leaving out for now. + terminate(Reason, Msg, GS2State); +process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> + handle_msg(Msg, GS2State); +process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }). %%% --------------------------------------------------- %%% Send/recive functions @@ -1161,17 +1160,22 @@ format_status(Opt, StatusData) -> end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> [{data, [{"State", State}]}] - end, + Specfic = callback(Mod, format_status, [Opt, [PDict, State]], + fun () -> [{data, [{"State", State}]}] end), + Messages = callback(Mod, format_message_queue, [Opt, Queue], + fun () -> priority_queue:to_list(Queue) end), [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", priority_queue:to_list(Queue)}]} | + {"Queued messages", Messages}]} | Specfic]. + +callback(Mod, FunName, Args, DefaultThunk) -> + case erlang:function_exported(Mod, FunName, length(Args)) of + true -> case catch apply(Mod, FunName, Args) of + {'EXIT', _} -> DefaultThunk(); + Success -> Success + end; + false -> DefaultThunk() + end. @@ -376,11 +376,11 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_cast/2, prioritise_info/2]). + code_change/3, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0, flush/1]). +-export([table_definitions/0]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). @@ -511,9 +511,6 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). -flush(Server) -> - gen_server2:cast(Server, flush). - init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), @@ -629,12 +626,12 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}; + {stop, normal, State}. -handle_cast(flush, State) -> - noreply( - flush_broadcast_buffer(State #state { broadcast_timer = undefined })). +handle_info(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })); handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state { self = Self, @@ -684,9 +681,7 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_cast(flush, _State) -> 1; -prioritise_cast(_ , _State) -> 0. - +prioritise_info(flush, _State) -> 1; prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -808,10 +803,10 @@ ensure_broadcast_timer(State = #state { broadcast_buffer = [], State; ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = TRef }) -> - timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { broadcast_timer = undefined }; ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> - {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), State #state { broadcast_timer = TRef }; ensure_broadcast_timer(State) -> State. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4a94b24b..4fc8b469 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -47,7 +47,10 @@ -ifdef(use_specs). --type(priority() :: integer()). +-export_type([q/0]). + +-type(q() :: pqueue()). +-type(priority() :: integer() | 'infinity'). -type(squeue() :: {queue, [any()], [any()]}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). @@ -71,8 +74,9 @@ new() -> is_queue({queue, R, F}) when is_list(R), is_list(F) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> - lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, - Queues); + lists:all(fun ({infinity, Q}) -> is_queue(Q); + ({P, Q}) -> is_integer(P) andalso is_queue(Q) + end, Queues); is_queue(_) -> false. @@ -89,7 +93,8 @@ len({pqueue, Queues}) -> to_list({queue, In, Out}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> - [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + [{maybe_negate_priority(P), V} || {P, Q} <- Queues, + {0, V} <- to_list(Q)]. in(Item, Q) -> in(Item, 0, Q). @@ -103,12 +108,20 @@ in(X, Priority, _Q = {queue, [], []}) -> in(X, Priority, Q = {queue, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> - P = -Priority, + P = maybe_negate_priority(Priority), {pqueue, case lists:keysearch(P, 1, Queues) of {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false when P == infinity -> + [{P, {queue, [X], []}} | Queues]; false -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + case Queues of + [{infinity, InfQueue} | Queues1] -> + [{infinity, InfQueue} | + lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + _ -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end end}. out({queue, [], []} = Q) -> @@ -141,7 +154,8 @@ join({queue, [], []}, B) -> join({queue, AIn, AOut}, {queue, BIn, BOut}) -> {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; join(A = {queue, _, _}, {pqueue, BPQ}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of [] -> [ {0, A} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; @@ -149,7 +163,8 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> end, {pqueue, Pre ++ Post1}; join({pqueue, APQ}, B = {queue, _, _}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of [] -> [ {0, B} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; @@ -165,7 +180,7 @@ merge(APQ, [], Acc) -> lists:reverse(Acc, APQ); merge([{P, A}|As], [{P, B}|Bs], Acc) -> merge(As, Bs, [ {P, join(A, B)} | Acc ]); -merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As, Bs, [ {PA, A} | Acc ]); merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). @@ -174,3 +189,6 @@ r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. + +maybe_negate_priority(infinity) -> infinity; +maybe_negate_priority(P) -> -P. diff --git a/src/rabbit.erl b/src/rabbit.erl index 6ef816c0..46f7d9d1 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -134,6 +134,18 @@ {requires, empty_db_check}, {enables, routing_ready}]}). +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, recovery}, + {enables, routing_ready}]}). + +-rabbit_boot_step({mirrored_queues, + [{description, "adding mirrors to queues"}, + {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, + {requires, mirror_queue_slave_sup}, + {enables, routing_ready}]}). + -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, {requires, core_initialized}]}). @@ -175,7 +187,7 @@ -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(stop_and_halt/0 :: () -> 'ok'). +-spec(stop_and_halt/0 :: () -> no_return()). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). -spec(status/0 :: () -> [{pid, integer()} | diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2d5b696a..84d8cdcb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,9 +32,7 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1, - emit_stats/1]). + set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -100,7 +98,6 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') @@ -143,11 +140,8 @@ -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(sync_timeout/1 :: (pid()) -> 'ok'). --spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). @@ -231,7 +225,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> end). store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write), ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> @@ -406,9 +400,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -emit_stats(#amqqueue{pid = QPid}) -> - delegate_cast(QPid, emit_stats). - delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). @@ -487,24 +478,12 @@ internal_delete(QueueName) -> run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). -sync_timeout(QPid) -> - gen_server2:cast(QPid, sync_timeout). - -update_ram_duration(QPid) -> - gen_server2:cast(QPid, update_ram_duration). - set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -maybe_expire(QPid) -> - gen_server2:cast(QPid, maybe_expire). - -drop_expired(QPid) -> - gen_server2:cast(QPid, drop_expired). - on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ae677ab4..c9dc6c8f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -31,7 +31,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -export([init_with_backing_queue_state/7]). @@ -249,8 +249,7 @@ backing_queue_module(#amqqueue{arguments = Args}) -> end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -258,14 +257,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> State; stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after( + ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; @@ -277,13 +274,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{expiry_timer_ref = undefined}. %% We wish to expire only when there are no consumers *and* the expiry @@ -295,18 +292,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), - {ok, TRef} = timer:apply_after( - Expires, rabbit_amqqueue, maybe_expire, [self()]), + TRef = erlang:send_after(Expires, self(), maybe_expire), NewState#q{expiry_timer_ref = TRef}; false -> State end. ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = Q}) -> + q = #amqqueue{pid = QPid}}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> rabbit_amqqueue:emit_stats(Q) end)}. + StatsTimer, QPid, emit_stats)}. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -700,8 +695,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, - [self()]), + false -> TRef = erlang:send_after(TTL, self(), drop_expired), State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> @@ -792,25 +786,27 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; _ -> 0 end. -prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; -prioritise_info(_Msg, _State) -> 0. +prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + case Msg of + {'DOWN', _, process, DownPid, _} -> 8; + update_ram_duration -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + sync_timeout -> 6; + _ -> 0 + end. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -1016,9 +1012,6 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast(sync_timeout, State) -> - noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); - handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); @@ -1089,15 +1082,6 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(update_ram_duration, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), @@ -1105,24 +1089,24 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); + noreply(State). -handle_cast(maybe_expire, State) -> +handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(drop_expired, State) -> +handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -1139,6 +1123,18 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_info(sync_timeout, State) -> + noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); + handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -1159,10 +1155,11 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), - rabbit_event:if_enabled(StatsTimer, - fun () -> - emit_stats(State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. + +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl new file mode 100644 index 00000000..22691ef9 --- /dev/null +++ b/src/rabbit_backing_queue_qc.erl @@ -0,0 +1,392 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_backing_queue_qc). +-ifdef(use_proper_qc). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). +-include_lib("proper/include/proper.hrl"). + +-behaviour(proper_statem). + +-define(BQMOD, rabbit_variable_queue). +-define(QUEUE_MAXLEN, 10000). +-define(TIMEOUT_LIMIT, 100). + +-define(RECORD_INDEX(Key, Record), + proplists:get_value(Key, lists:zip( + record_info(fields, Record), lists:seq(2, record_info(size, Record))))). + +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). + +-record(state, {bqstate, + len, %% int + messages, %% queue of {msg_props, basic_msg} + acks, %% dict of acktag => {msg_props, basic_msg} + confirms}). %% set of msgid + +%% Initialise model + +initial_state() -> + #state{bqstate = qc_variable_queue_init(qc_test_queue()), + len = 0, + messages = queue:new(), + acks = orddict:new(), + confirms = gb_sets:new()}. + +%% Property + +prop_backing_queue_test() -> + ?FORALL(Cmds, commands(?MODULE, initial_state()), + backing_queue_test(Cmds)). + +backing_queue_test(Cmds) -> + {ok, FileSizeLimit} = + application:get_env(rabbit, msg_store_file_size_limit), + application:set_env(rabbit, msg_store_file_size_limit, 512, + infinity), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + application:set_env(rabbit, queue_index_max_journal_entries, 128, + infinity), + + {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds), + + application:set_env(rabbit, msg_store_file_size_limit, + FileSizeLimit, infinity), + application:set_env(rabbit, queue_index_max_journal_entries, + MaxJournal, infinity), + + ?BQMOD:delete_and_terminate(shutdown, BQ), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +%% Commands + +%% Command frequencies are tuned so that queues are normally reasonably +%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple +%% and purging cause extreme queue lengths, so these have lower probabilities. +%% Fetches are sufficiently frequent so that commands that need acktags +%% get decent coverage. + +command(S) -> + frequency([{10, qc_publish(S)}, + {1, qc_publish_delivered(S)}, + {1, qc_publish_multiple(S)}, %% very slow + {15, qc_fetch(S)}, %% needed for ack and requeue + {15, qc_ack(S)}, + {15, qc_requeue(S)}, + {3, qc_set_ram_duration_target(S)}, + {1, qc_ram_duration(S)}, + {1, qc_drain_confirmed(S)}, + {1, qc_dropwhile(S)}, + {1, qc_is_empty(S)}, + {1, qc_timeout(S)}, + {1, qc_purge(S)}]). + +qc_publish(#state{bqstate = BQ}) -> + {call, ?BQMOD, publish, + [qc_message(), + #message_properties{needs_confirming = frequency([{1, true}, + {20, false}]), + expiry = oneof([undefined | lists:seq(1, 10)])}, + self(), BQ]}. + +qc_publish_multiple(#state{bqstate = BQ}) -> + {call, ?MODULE, publish_multiple, + [qc_message(), #message_properties{}, BQ, + resize(?QUEUE_MAXLEN, pos_integer())]}. + +qc_publish_delivered(#state{bqstate = BQ}) -> + {call, ?BQMOD, publish_delivered, + [boolean(), qc_message(), #message_properties{}, self(), BQ]}. + +qc_fetch(#state{bqstate = BQ}) -> + {call, ?BQMOD, fetch, [boolean(), BQ]}. + +qc_ack(#state{bqstate = BQ, acks = Acks}) -> + {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}. + +qc_requeue(#state{bqstate = BQ, acks = Acks}) -> + {call, ?BQMOD, requeue, + [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + +qc_set_ram_duration_target(#state{bqstate = BQ}) -> + {call, ?BQMOD, set_ram_duration_target, + [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}. + +qc_ram_duration(#state{bqstate = BQ}) -> + {call, ?BQMOD, ram_duration, [BQ]}. + +qc_drain_confirmed(#state{bqstate = BQ}) -> + {call, ?BQMOD, drain_confirmed, [BQ]}. + +qc_dropwhile(#state{bqstate = BQ}) -> + {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. + +qc_is_empty(#state{bqstate = BQ}) -> + {call, ?BQMOD, is_empty, [BQ]}. + +qc_timeout(#state{bqstate = BQ}) -> + {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}. + +qc_purge(#state{bqstate = BQ}) -> + {call, ?BQMOD, purge, [BQ]}. + +%% Preconditions + +precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) + when Fun =:= ack; Fun =:= requeue -> + orddict:size(Acks) > 0; +precondition(#state{messages = Messages}, + {call, ?BQMOD, publish_delivered, _Arg}) -> + queue:is_empty(Messages); +precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> + true; +precondition(_S, {call, ?MODULE, timeout, _Arg}) -> + true; +precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> + Len < ?QUEUE_MAXLEN. + +%% Model updates + +next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> + #state{len = Len, messages = Messages, confirms = Confirms} = S, + MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, + NeedsConfirm = + {call, erlang, element, + [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, + S#state{bqstate = BQ, + len = Len + 1, + messages = queue:in({MsgProps, Msg}, Messages), + confirms = case eval(NeedsConfirm) of + true -> gb_sets:add(MsgId, Confirms); + _ -> Confirms + end}; + +next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> + #state{len = Len, messages = Messages} = S, + Messages1 = repeat(Messages, fun(Msgs) -> + queue:in({MsgProps, Msg}, Msgs) + end, Count), + S#state{bqstate = BQ, + len = Len + Count, + messages = Messages1}; + +next_state(S, Res, + {call, ?BQMOD, publish_delivered, + [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> + #state{confirms = Confirms, acks = Acks} = S, + AckTag = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, + NeedsConfirm = + {call, erlang, element, + [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, + S#state{bqstate = BQ1, + confirms = case eval(NeedsConfirm) of + true -> gb_sets:add(MsgId, Confirms); + _ -> Confirms + end, + acks = case AckReq of + true -> orddict:append(AckTag, {MsgProps, Msg}, Acks); + false -> Acks + end + }; + +next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> + #state{len = Len, messages = Messages, acks = Acks} = S, + ResultInfo = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + AckTag = {call, erlang, element, [3, ResultInfo]}, + S1 = S#state{bqstate = BQ1}, + case queue:out(Messages) of + {empty, _M2} -> + S1; + {{value, MsgProp_Msg}, M2} -> + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)}; + false -> + S2 + end + end; + +next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> + #state{acks = AcksState} = S, + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1, + acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + +next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> + #state{len = Len, messages = Messages, acks = AcksState} = S, + BQ1 = {call, erlang, element, [2, Res]}, + RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || + Key <- AcksArg]), + S#state{bqstate = BQ1, + len = Len + length(RequeueMsgs), + messages = queue:join(Messages, queue:from_list(RequeueMsgs)), + acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + +next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> + S#state{bqstate = BQ}; + +next_state(S, Res, {call, ?BQMOD, ram_duration, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}; + +next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}; + +next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> + #state{messages = Messages} = S, + Messages1 = drop_messages(Messages), + S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1}; + +next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> + S; + +next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> + S#state{bqstate = BQ}; + +next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1, len = 0, messages = queue:new()}. + +%% Postconditions + +postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> + #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, + case Res of + {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> + {_MsgProps, Msg} = queue:head(Messages), + MsgFetched =:= Msg andalso + not orddict:is_key(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms) andalso + RemainingLen =:= Len - 1; + {empty, _BQ} -> + Len =:= 0 + end; + +postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> + #state{acks = Acks, confirms = Confrms} = S, + not orddict:is_key(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms); + +postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> + {PurgeCount, _BQ} = Res, + Len =:= PurgeCount; + +postcondition(#state{len = Len}, + {call, ?BQMOD, is_empty, _Args}, Res) -> + (Len =:= 0) =:= Res; + +postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> + #state{confirms = Confirms} = S, + {ReportedConfirmed, _BQ} = Res, + lists:all(fun (M) -> + gb_sets:is_element(M, Confirms) + end, ReportedConfirmed); + +postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> + ?BQMOD:len(BQ) =:= Len. + +%% Helpers + +repeat(Result, _Fun, 0) -> + Result; +repeat(Result, Fun, Times) -> + repeat(Fun(Result), Fun, Times - 1). + +publish_multiple(Msg, MsgProps, BQ, Count) -> + repeat(BQ, fun(BQ1) -> + ?BQMOD:publish(Msg, MsgProps, self(), BQ1) + end, Count). + +timeout(BQ, 0) -> + BQ; +timeout(BQ, AtMost) -> + case ?BQMOD:needs_timeout(BQ) of + false -> BQ; + _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1) + end. + +qc_message_payload() -> + ?SIZED(Size, resize(Size * Size, binary())). + +qc_routing_key() -> + noshrink(binary(10)). + +qc_delivery_mode() -> + oneof([1, 2]). + +qc_message() -> + qc_message(qc_delivery_mode()). + +qc_message(DeliveryMode) -> + {call, rabbit_basic, message, [ + qc_default_exchange(), + qc_routing_key(), + #'P_basic'{delivery_mode = DeliveryMode}, + qc_message_payload()]}. + +qc_default_exchange() -> + {call, rabbit_misc, r, [<<>>, exchange, <<>>]}. + +qc_variable_queue_init(Q) -> + {call, ?BQMOD, init, + [Q, false, function(2, ok)]}. + +qc_test_q() -> + {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. + +qc_test_queue() -> + qc_test_queue(boolean()). + +qc_test_queue(Durable) -> + #amqqueue{name = qc_test_q(), + durable = Durable, + auto_delete = false, + arguments = [], + pid = self()}. + +rand_choice([]) -> []; +rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. + +dropfun(Props) -> + Expiry = eval({call, erlang, element, + [?RECORD_INDEX(expiry, message_properties), Props]}), + Expiry =/= 1. + +drop_messages(Messages) -> + case queue:out(Messages) of + {empty, _} -> + Messages; + {{value, MsgProps_Msg}, M2} -> + MsgProps = {call, erlang, element, [1, MsgProps_Msg]}, + case dropfun(MsgProps) of + true -> drop_messages(M2); + false -> Messages + end + end. + +-endif. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2829d397..35d77c45 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -23,11 +23,11 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). +-export([refresh_config_all/0, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, @@ -90,7 +90,6 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_all/0 :: () -> 'ok'). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. @@ -152,9 +151,6 @@ refresh_config_all() -> fun (C) -> gen_server2:call(C, refresh_config) end, list()), ok. -emit_stats(Pid) -> - gen_server2:cast(Pid, emit_stats). - ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). @@ -194,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State) end), + fun() -> emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -207,11 +203,16 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(flush, _From, State) -> reply(ok, State); @@ -293,11 +294,6 @@ handle_cast({deliver, ConsumerTag, AckRequired, rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> - internal_emit_stats(State), - noreply([ensure_stats_timer], - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); - handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). @@ -305,6 +301,11 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); +handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> + emit_stats(State), + noreply([ensure_stats_timer], + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); + handle_info({'DOWN', MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> noreply( @@ -320,11 +321,8 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, - fun () -> - internal_emit_stats( - State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. @@ -342,6 +340,8 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> reply(Reply, [], NewState). @@ -366,8 +366,7 @@ next_state(Mask, State) -> ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(ChPid) end)}. + StatsTimer, ChPid, emit_stats)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1497,10 +1496,10 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State) -> - internal_emit_stats(State, []). +emit_stats(State) -> + emit_stats(State, []). -internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> +emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6eb1aaba..e8afed0c 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -17,7 +17,7 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1]). +-export([start/0, stop/0, action/5, diagnostics/1, log_action/3]). -define(RPC_TIMEOUT, infinity). -define(WAIT_FOR_VM_ATTEMPTS, 5). @@ -51,6 +51,7 @@ -> 'ok'). -spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). +-spec(log_action/3 :: (node(), string(), [term()]) -> ok). -endif. @@ -73,6 +74,7 @@ start() -> Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), + rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -474,3 +476,22 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +log_action(Node, Command, Args) -> + rabbit_misc:with_local_io( + fun () -> + error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n", + [Node, Command, + format_args(mask_args(Command, Args))]) + end). + +%% Mask passwords and other sensitive info before logging. +mask_args("add_user", [Name, _Password | Args]) -> + [Name, "****" | Args]; +mask_args("change_password", [Name, _Password | Args]) -> + [Name, "****" | Args]; +mask_args(_, Args) -> + Args. + +format_args(Args) -> + string:join([io_lib:format("~p", [A]) || A <- Args], " "). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 468f9293..bb765566 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). -export([notify/2, notify_if/3]). @@ -57,7 +57,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()). -spec(stop_stats_timer/1 :: (state()) -> state()). -spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). @@ -80,7 +80,7 @@ start_link() -> %% if_enabled(internal_emit_stats) - so we immediately send something %% %% On wakeup: -%% ensure_stats_timer(Timer, emit_stats) +%% ensure_stats_timer(Timer, Pid, emit_stats) %% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) %% %% emit_stats: @@ -99,13 +99,13 @@ init_stats_timer() -> {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), #state{level = StatsLevel, interval = Interval, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _Fun) -> +ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) -> State; ensure_stats_timer(State = #state{interval = Interval, - timer = undefined}, Fun) -> - {ok, TRef} = timer:apply_after(Interval, erlang, apply, [Fun, []]), + timer = undefined}, Pid, Msg) -> + TRef = erlang:send_after(Interval, Pid, Msg), State#state{timer = TRef}; -ensure_stats_timer(State, _Fun) -> +ensure_stats_timer(State, _Pid, _Msg) -> State. stop_stats_timer(State = #state{level = none}) -> @@ -113,7 +113,7 @@ stop_stats_timer(State = #state{level = none}) -> stop_stats_timer(State = #state{timer = undefined}) -> State; stop_stats_timer(State = #state{timer = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#state{timer = undefined}. reset_stats_timer(State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 15193d2b..8ee90645 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -58,7 +58,7 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, - queues = dict:new(), % QPid -> {MonitorRef, Notify} + queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -224,31 +224,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> - case dict:is_key(QPid, Queues) of + case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> - case dict:find(QPid, Queues) of - {ok, {MRef, _}} -> - true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), - State#lim{queues = dict:erase(QPid, Queues)}; - error -> State + case orddict:find(QPid, Queues) of + {ok, {MRef, _}} -> true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = orddict:erase(QPid, Queues)}; + error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), + orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), case length(QList) of 0 -> ok; L -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b38a8967..c918f388 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -37,7 +37,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -89,9 +89,8 @@ init([#amqqueue { name = QueueName } = Q]) -> %% ASSERTION [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], MPids1 = MPids ++ [Self], - mnesia:write(rabbit_queue, - Q1 #amqqueue { slave_pids = MPids1 }, - write), + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), {ok, QPid} end), erlang:monitor(process, MPid), @@ -187,9 +186,9 @@ handle_cast({set_ram_duration_target, Duration}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - noreply(State #state { backing_queue_state = BQS1 }); + noreply(State #state { backing_queue_state = BQS1 }). -handle_cast(update_ram_duration, +handle_info(update_ram_duration, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), @@ -199,9 +198,9 @@ handle_cast(update_ram_duration, noreply(State #state { rate_timer_ref = just_measured, backing_queue_state = BQS2 }); -handle_cast(sync_timeout, State) -> +handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( - State #state { sync_timer_ref = undefined })). + State #state { sync_timer_ref = undefined })); handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -266,16 +265,21 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; {gm, _Msg} -> 5; {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + sync_timeout -> 6; + _ -> 0 + end. + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -516,8 +520,7 @@ backing_queue_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> State. @@ -525,14 +528,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { sync_timer_ref = undefined }. ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), State #state { rate_timer_ref = TRef }; ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; @@ -544,7 +545,7 @@ stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> State #state { rate_timer_ref = undefined }; stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 879a6017..fc04ec79 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -16,18 +16,6 @@ -module(rabbit_mirror_queue_slave_sup). --rabbit_boot_step({mirror_queue_slave_sup, - [{description, "mirror queue slave sup"}, - {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, - {requires, recovery}, - {enables, routing_ready}]}). - --rabbit_boot_step({mirrored_queues, - [{description, "adding mirrors to queues"}, - {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, mirror_queue_slave_sup}, - {enables, routing_ready}]}). - -behaviour(supervisor2). -export([start/0, start_link/0, start_child/2]). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b6b97f6d..b98dbd46 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -42,7 +42,7 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). --export([format_stderr/2]). +-export([format_stderr/2, with_local_io/1]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -57,6 +57,7 @@ -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). +-export([format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -164,6 +165,7 @@ -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). +-spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). @@ -205,6 +207,7 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -endif. @@ -603,6 +606,17 @@ format_stderr(Fmt, Args) -> end, ok. +%% Execute Fun using the IO system of the local node (i.e. the node on +%% which the code is executing). +with_local_io(Fun) -> + GL = group_leader(), + group_leader(whereis(user), self()), + try + Fun() + after + group_leader(GL, self()) + end. + manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> case Do(App) of @@ -919,3 +933,24 @@ pget_or_die(K, P) -> undefined -> exit({error, key_missing, K}); V -> V end. + +format_message_queue(_Opt, MQ) -> + Len = priority_queue:len(MQ), + {Len, + case Len > 100 of + false -> priority_queue:to_list(MQ); + true -> {summary, + orddict:to_list( + lists:foldl( + fun ({P, V}, Counts) -> + orddict:update_counter( + {P, format_message_queue_entry(V)}, 1, Counts) + end, orddict:new(), priority_queue:to_list(MQ)))} + end}. + +format_message_queue_entry(V) when is_atom(V) -> + V; +format_message_queue_entry(V) when is_tuple(V) -> + list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); +format_message_queue_entry(_V) -> + '_'. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8d5c8646..ab553a8b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -23,7 +23,8 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, - delete_previously_running_nodes/0, running_nodes_filename/0]). + delete_previously_running_nodes/0, running_nodes_filename/0, + is_disc_node/0]). -export([table_names/0]). @@ -65,6 +66,7 @@ -spec(read_previously_running_nodes/0 :: () -> [node()]). -spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). +-spec(is_disc_node/0 :: () -> boolean()). -endif. @@ -115,13 +117,47 @@ force_cluster(ClusterNodes) -> cluster(ClusterNodes, Force) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + + %% Wipe mnesia if we're changing type from disc to ram + case {is_disc_node(), should_be_disc_node(ClusterNodes)} of + {true, false} -> error_logger:warning_msg( + "changing node type; wiping mnesia...~n~n"), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema); + _ -> ok + end, + + %% Pre-emptively leave the cluster + %% + %% We're trying to handle the following two cases: + %% 1. We have a two-node cluster, where both nodes are disc nodes. + %% One node is re-clustered as a ram node. When it tries to + %% re-join the cluster, but before it has time to update its + %% tables definitions, the other node will order it to re-create + %% its disc tables. So, we need to leave the cluster before we + %% can join it again. + %% 2. We have a two-node cluster, where both nodes are disc nodes. + %% One node is forcefully reset (so, the other node thinks its + %% still a part of the cluster). The reset node is re-clustered + %% as a ram node. Same as above, we need to leave the cluster + %% before we can join it. But, since we don't know if we're in a + %% cluster or not, we just pre-emptively leave it before joining. + ProperClusterNodes = ClusterNodes -- [node()], + try + ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) + catch + {error, {no_running_cluster_nodes, _, _}} when Force -> + ok + end, + + %% Join the cluster + start_mnesia(), try ok = init_db(ClusterNodes, Force, fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after - mnesia:stop() + stop_mnesia() end, ok. @@ -158,10 +194,13 @@ nodes_of_type(Type) -> %% This function should return the nodes of a certain type (ram, %% disc or disc_only) in the current cluster. The type of nodes %% is determined when the cluster is initially configured. - %% Specifically, we check whether a certain table, which we know - %% will be written to disk on a disc node, is stored on disk or in - %% RAM. - mnesia:table_info(rabbit_durable_exchange, Type). + mnesia:table_info(schema, Type). + +%% The tables aren't supposed to be on disk on a ram node +table_definitions(disc) -> + table_definitions(); +table_definitions(ram) -> + [{Tab, copy_type_to_ram(TabDef)} || {Tab, TabDef} <- table_definitions()]. table_definitions() -> [{rabbit_user, @@ -218,8 +257,6 @@ table_definitions() -> {type, ordered_set}, {match, #topic_trie_binding{trie_binding = trie_binding_match(), _='_'}}]}, - %% Consider the implications to nodes_of_type/1 before altering - %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, @@ -341,7 +378,11 @@ check_table_content(Tab, TabDef) -> end. check_tables(Fun) -> - case [Error || {Tab, TabDef} <- table_definitions(), + case [Error || {Tab, TabDef} <- table_definitions( + case is_disc_node() of + true -> disc; + false -> ram + end), case Fun(Tab, TabDef) of ok -> Error = none, false; {error, Error} -> true @@ -442,30 +483,47 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir)} of - {[], false} -> + WantDiscNode = should_be_disc_node(ClusterNodes), + WasDiscNode = is_disc_node(), + %% We create a new db (on disk, or in ram) in the first + %% two cases and attempt to upgrade the in the other two + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% New ram node; start from scratch + ok = create_schema(ram); + {[], false, true} -> %% Nothing there at all, start from scratch - ok = create_schema(); - {[], true} -> + ok = create_schema(disc); + {[], true, true} -> %% We're the first node up case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); version_not_available -> ok = schema_ok_or_move() - end, - ok; - {[AnotherNode|_], _} -> + end; + {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), + {CopyType, CopyTypeAlt} = + case WantDiscNode of + true -> {disc, disc_copies}; + false -> {ram, ram_copies} + end, ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), + ok = create_local_table_copy(schema, CopyTypeAlt), + ok = create_local_table_copies(CopyType), + ok = SecondaryPostMnesiaFun(), + %% We've taken down mnesia, so ram nodes will need + %% to re-sync + case is_disc_node() of + false -> start_mnesia(), + mnesia:change_config(extra_db_nodes, + ProperClusterNodes), + wait_for_replicated_tables(); + true -> ok + end, + ensure_schema_integrity(), ok end; @@ -496,7 +554,7 @@ schema_ok_or_move() -> "and recreating schema from scratch~n", [Reason]), ok = move_db(), - ok = create_schema() + ok = create_schema(disc) end. ensure_version_ok({ok, DiscVersion}) -> @@ -508,18 +566,27 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -create_schema() -> - mnesia:stop(), - rabbit_misc:ensure_ok(mnesia:create_schema([node()]), - cannot_create_schema), - rabbit_misc:ensure_ok(mnesia:start(), - cannot_start_mnesia), - ok = create_tables(), +create_schema(Type) -> + stop_mnesia(), + case Type of + disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]), + cannot_create_schema); + ram -> %% remove the disc schema since this is a ram node + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema) + end, + start_mnesia(), + ok = create_tables(Type), ensure_schema_integrity(), ok = rabbit_version:record_desired(). +is_disc_node() -> mnesia:system_info(use_dir). + +should_be_disc_node(ClusterNodes) -> + ClusterNodes == [] orelse lists:member(node(), ClusterNodes). + move_db() -> - mnesia:stop(), + stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), BackupDir = lists:flatten( @@ -537,14 +604,16 @@ move_db() -> MnesiaDir, BackupDir, Reason}}) end, ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + start_mnesia(), ok. copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_misc:recursive_copy(dir(), Destination). -create_tables() -> +create_tables() -> create_tables(disc). + +create_tables(Type) -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), case mnesia:create_table(Tab, TabDef1) of @@ -554,9 +623,13 @@ create_tables() -> Tab, TabDef1, Reason}}) end end, - table_definitions()), + table_definitions(Type)), ok. +copy_type_to_ram(TabDef) -> + [{disc_copies, []}, {ram_copies, [node()]} + | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. + table_has_copy_type(TabDef, DiscType) -> lists:member(node(), proplists:get_value(DiscType, TabDef, [])). @@ -586,7 +659,7 @@ create_local_table_copies(Type) -> end, ok = create_local_table_copy(Tab, StorageType) end, - table_definitions()), + table_definitions(Type)), ok. create_local_table_copy(Tab, Type) -> @@ -622,14 +695,14 @@ reset(Force) -> true -> ok; false -> ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + start_mnesia(), {Nodes, RunningNodes} = try ok = init(), {all_clustered_nodes() -- [Node], running_clustered_nodes() -- [Node]} after - mnesia:stop() + stop_mnesia() end, leave_cluster(Nodes, RunningNodes), rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), @@ -652,6 +725,7 @@ leave_cluster(Nodes, RunningNodes) -> [schema, node()]) of {atomic, ok} -> true; {badrpc, nodedown} -> false; + {aborted, {node_not_running, _}} -> false; {aborted, Reason} -> throw({error, {failed_to_leave_cluster, Nodes, RunningNodes, Reason}}) @@ -662,3 +736,11 @@ leave_cluster(Nodes, RunningNodes) -> false -> throw({error, {no_running_cluster_nodes, Nodes, RunningNodes}}) end. + +start_mnesia() -> + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ensure_mnesia_running(). + +stop_mnesia() -> + stopped = mnesia:stop(), + ensure_mnesia_not_running(). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3f4162cd..e90e1281 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,13 +23,14 @@ client_ref/1, close_all_indicated/1, write/3, read/2, contains/2, remove/2, sync/3]). --export([sync/1, set_maximum_since_use/2, - has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([set_maximum_since_use/2, has_readers/2, combine_files/3, + delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, prioritise_call/3, prioritise_cast/2, + prioritise_info/2, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -153,7 +154,6 @@ -spec(sync/3 :: ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). --spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> @@ -443,9 +443,6 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). -sync(Server) -> - gen_server2:cast(Server, sync). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -682,7 +679,6 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -690,6 +686,12 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + sync -> 8; + _ -> 0 + end. + handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); @@ -773,9 +775,6 @@ handle_cast({sync, MsgIds, K}, true -> noreply(State #msstate { on_sync = [K | Syncs] }) end; -handle_cast(sync, State) -> - noreply(internal_sync(State)); - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -799,6 +798,9 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). +handle_info(sync, State) -> + noreply(internal_sync(State)); + handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -836,6 +838,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -863,13 +867,13 @@ next_state(State = #msstate { on_sync = Syncs, end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync), State #msstate { sync_timer_ref = TRef }. stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 1f30a2fc..78aeb2ef 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -60,24 +60,19 @@ notify_cluster() -> %%-------------------------------------------------------------------- init([]) -> - ok = net_kernel:monitor_nodes(true), {ok, no_state}. handle_call(_Request, _From, State) -> {noreply, State}. handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("node ~p up~n", [Node]), + rabbit_log:info("rabbit on ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), ok = rabbit_alarm:on_node_up(Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down~n", [Node]), - ok = handle_dead_rabbit(Node), - {noreply, State}; handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), ok = handle_dead_rabbit(Node), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bf89cdb2..636913b5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -569,13 +569,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, unacked = UnackedCount }) -> - Segment1 = Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries) }, - case Action of - del -> Segment1; - ack -> Segment1 #segment { unacked = UnackedCount - 1 }; - ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 } - end; + Segment #segment { + journal_entries = add_to_journal(RelSeq, Action, JEntries), + unacked = UnackedCount + case Action of + ?PUB -> +1; + del -> 0; + ack -> -1 + end}; add_to_journal(RelSeq, Action, JEntries) -> Val = case array:get(RelSeq, JEntries) of @@ -1013,7 +1013,7 @@ add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, Rest>>) -> + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, Rest}; add_queue_ttl_segment(_) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index dffabf85..2dccc748 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,8 +28,6 @@ -export([process_channel_frame/5]). %% used by erlang-client --export([emit_stats/1]). - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). @@ -70,7 +68,6 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -126,9 +123,6 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -emit_stats(Pid) -> - gen_server:cast(Pid, emit_stats). - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -323,8 +317,8 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); -handle_other({'$gen_cast', emit_stats}, Deb, State) -> - mainloop(Deb, internal_emit_stats(State)); +handle_other(emit_stats, Deb, State) -> + mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other(Other, _Deb, _State) -> @@ -591,10 +585,8 @@ refuse_connection(Sock, Exception) -> ensure_stats_timer(State = #v1{stats_timer = StatsTimer, connection_state = running}) -> - Self = self(), State#v1{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(Self) end)}; + StatsTimer, self(), emit_stats)}; ensure_stats_timer(State) -> State. @@ -694,7 +686,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State1) end), + fun() -> emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -923,6 +915,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, State1#v1.sock, 0, CloseMethod, Protocol), State1. -internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> +emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3ba87b00..6266b5ef 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -203,6 +203,42 @@ test_priority_queue() -> {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = test_priority_queue(Q15), + %% 1-element infinity priority Q + Q16 = priority_queue:in(foo, infinity, Q), + {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16), + + %% add infinity to 0-priority Q + Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q17), + + %% and the other way around + Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q18), + + %% add infinity to mixed-priority Q + Q19 = priority_queue:in(qux, infinity, Q3), + {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} = + test_priority_queue(Q19), + + %% merge the above with a negative priority Q + Q20 = priority_queue:join(Q19, Q4), + {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}], + [qux, bar, foo, foo]} = test_priority_queue(Q20), + + %% merge two infinity priority queues + Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q), + priority_queue:in(bar, infinity, Q)), + {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} = + test_priority_queue(Q21), + + %% merge two mixed priority with infinity queues + Q22 = priority_queue:join(Q18, Q20), + {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo}, + {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} = + test_priority_queue(Q22), + passed. priority_queue_in_all(Q, L) -> @@ -904,7 +940,6 @@ test_option_parser() -> passed. test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped {error, _} = control_action(cluster, []), {error, _} = control_action(reset, []), @@ -952,13 +987,16 @@ test_cluster_management() -> ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), %% join a non-existing cluster as a ram node ok = control_action(reset, []), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of @@ -977,15 +1015,18 @@ test_cluster_management2(SecondaryNode) -> %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), + ok = assert_disc_node(), %% make a ram node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), + ok = assert_ram_node(), %% join cluster as a ram node ok = control_action(reset, []), ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), %% change cluster config while remaining in same cluster ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), @@ -997,27 +1038,45 @@ test_cluster_management2(SecondaryNode) -> "invalid2@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), - %% join empty cluster as a ram node + %% join empty cluster as a ram node (converts to disc) ok = control_action(cluster, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), - %% turn ram node into disk node + %% make a new ram node ok = control_action(reset, []), + ok = control_action(force_cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok = assert_ram_node(), + + %% turn ram node into disk node ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), %% convert a disk node into a ram node + ok = assert_disc_node(), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), + + %% make a new disk node + ok = control_action(force_reset, []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok = assert_disc_node(), %% turn a disk node into a ram node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), %% NB: this will log an inconsistent_database error, which is harmless %% Turning cover on / off is OK even if we're not in general using cover, @@ -1043,6 +1102,10 @@ test_cluster_management2(SecondaryNode) -> {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), + %% attempt to change type when no other node is alive + {error, {no_running_cluster_nodes, _, _}} = + control_action(cluster, [SecondaryNodeS]), + %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), ok = control_action(start_app, []), @@ -1224,7 +1287,7 @@ test_statistics_event_receiver(Pid) -> test_statistics_receive_event(Ch, Matcher) -> rabbit_channel:flush(Ch), - rabbit_channel:emit_stats(Ch), + Ch ! emit_stats, test_statistics_receive_event1(Ch, Matcher). test_statistics_receive_event1(Ch, Matcher) -> @@ -1581,6 +1644,18 @@ clean_logs(Files, Suffix) -> end || File <- Files], ok. +assert_ram_node() -> + case rabbit_mnesia:is_disc_node() of + true -> exit('not_ram_node'); + false -> ok + end. + +assert_disc_node() -> + case rabbit_mnesia:is_disc_node() of + true -> ok; + false -> exit('not_disc_node') + end. + delete_file(File) -> case file:delete(File) of ok -> ok; diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index a2abb1e5..9739f6b7 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -144,7 +144,7 @@ upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> AfterUs = rabbit_mnesia:read_previously_running_nodes(), - case {is_disc_node(), AfterUs} of + case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> @@ -182,12 +182,6 @@ upgrade_mode(AllNodes) -> end end. -is_disc_node() -> - %% This is pretty ugly but we can't start Mnesia and ask it (will hang), - %% we can't look at the config file (may not include us even if we're a - %% disc node). - filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). - die(Msg, Args) -> %% We don't throw or exit here since that gets thrown %% straight out into do_boot, generating an erl_crash.dump @@ -218,7 +212,7 @@ force_tables() -> secondary_upgrade(AllNodes) -> %% must do this before we wipe out schema - IsDiscNode = is_disc_node(), + IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), %% Note that we cluster with all nodes, rather than all disc nodes @@ -282,6 +276,14 @@ lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). backup_dir() -> dir() ++ "-upgrade-backup". +is_disc_node_legacy() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will + %% hang), we can't look at the config file (may not include us + %% even if we're a disc node). We also can't use + %% rabbit_mnesia:is_disc_node/0 because that will give false + %% postivies on Rabbit up to 2.5.1. + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + %% NB: we cannot use rabbit_log here since it may not have been %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index ec1ee9cd..405949ef 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -76,7 +76,6 @@ %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). --export([delayed_restart/2]). -define(DICT, dict). @@ -157,9 +156,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. -delayed_restart(Supervisor, RestartDetails) -> - gen_server:cast(Supervisor, {delayed_restart, RestartDetails}). - %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -355,12 +351,19 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. +%%% Hopefully cause a function-clause as there is no API function +%%% that utilizes cast. +handle_cast(null, State) -> + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + []), + + {noreply, State}. -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), {noreply, NState}; -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of {value, Child1} -> {ok, NState} = do_restart(RestartType, Reason, Child1, State), @@ -369,14 +372,6 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> {noreply, State} end; -%%% Hopefully cause a function-clause as there is no API function -%%% that utilizes cast. -handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", - []), - - {noreply, State}. - %% %% Take care of terminated children. %% @@ -539,9 +534,9 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, NState} -> {ok, NState}; {terminate, NState} -> - {ok, _TRef} = timer:apply_after( - trunc(Delay*1000), ?MODULE, delayed_restart, - [self(), {{RestartType, Delay}, Reason, Child}]), + _TRef = erlang:send_after(trunc(Delay*1000), self(), + {delayed_restart, + {{RestartType, Delay}, Reason, Child}}), {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> |