diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-08 17:15:52 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-08 17:15:52 +0100 |
commit | c91aaa2b2b46ef2cf6f5eb80feedec7581ca1846 (patch) | |
tree | 20d3c0f73515e2c7aa3028eba46517068876d467 | |
parent | d4cff88670b118b58be6e19cb58a49979ff67408 (diff) | |
parent | d3e6a52a8dfbbfcfbce31f72dac5d951e9d5888b (diff) | |
download | rabbitmq-server-c91aaa2b2b46ef2cf6f5eb80feedec7581ca1846.tar.gz |
Merging v2_5 branch to default
52 files changed, 3672 insertions, 1318 deletions
@@ -20,6 +20,8 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) +QC_MODULES := rabbit_backing_queue_qc +QC_TRIALS ?= 100 ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -45,8 +47,14 @@ ifndef USE_SPECS USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().') endif +ifndef USE_PROPER_QC +# PropEr needs to be installed for property checking +# http://proper.softlab.ntua.gr/ +USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:ensure_loaded(proper)), halt().') +endif + #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -69,6 +77,10 @@ define usage_dep $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl endef +define boolean_macro +$(if $(filter true,$(1)),-D$(2)) +endef + ifneq "$(SBIN_DIR)" "" ifneq "$(TARGET_DIR)" "" SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) @@ -165,6 +177,9 @@ run-tests: all OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \ echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null +run-qc: all + $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) + start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ @@ -223,7 +238,7 @@ srcdist: distclean chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) - (cd dist; zip -r $(TARBALL_NAME).zip $(TARBALL_NAME)) + (cd dist; zip -q -r $(TARBALL_NAME).zip $(TARBALL_NAME)) rm -rf $(TARGET_SRC_DIR) distclean: clean @@ -314,3 +329,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) endif +.PHONY: run-qc diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 06dcfff7..ee000215 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -513,17 +513,22 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_user_tags</command> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>tag</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> <term>username</term> - <listitem><para>The name of the user whose administrative - status is to be set.</para></listitem> + <listitem><para>The name of the user whose tags are to + be set.</para></listitem> + </varlistentry> + <varlistentry> + <term>tag</term> + <listitem><para>Zero, one or more tags to set. Any + existing tags will be removed.</para></listitem> </varlistentry> </variablelist> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl set_admin tonyg</screen> + <screen role="example">rabbitmqctl set_user_tags tonyg administrator</screen> <para role="example"> This command instructs the RabbitMQ broker to ensure the user named <command>tonyg</command> is an administrator. This has no @@ -532,24 +537,10 @@ user logs in via some other means (for example with the management plugin). </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> - <listitem> - <variablelist> - <varlistentry> - <term>username</term> - <listitem><para>The name of the user whose administrative - status is to be cleared.</para></listitem> - </varlistentry> - </variablelist> - <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl clear_admin tonyg</screen> + <screen role="example">rabbitmqctl set_user_tags tonyg</screen> <para role="example"> - This command instructs the RabbitMQ broker to ensure the user - named <command>tonyg</command> is not an administrator. + This command instructs the RabbitMQ broker to remove any + tags from the user named <command>tonyg</command>. </para> </listitem> </varlistentry> @@ -1209,6 +1200,10 @@ <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem> </varlistentry> <varlistentry> + <term>confirm</term> + <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem> + </varlistentry> + <varlistentry> <term>consumer_count</term> <listitem><para>Number of logical AMQP consumers retrieving messages via the channel.</para></listitem> @@ -1219,11 +1214,22 @@ yet acknowledged.</para></listitem> </varlistentry> <varlistentry> + <term>messages_uncommitted</term> + <listitem><para>Number of messages received in an as yet + uncommitted transaction.</para></listitem> + </varlistentry> + <varlistentry> <term>acks_uncommitted</term> <listitem><para>Number of acknowledgements received in an as yet uncommitted transaction.</para></listitem> </varlistentry> <varlistentry> + <term>messages_unconfirmed</term> + <listitem><para>Number of published messages not yet + confirmed. On channels not in confirm mode, this + remains 0.</para></listitem> + </varlistentry> + <varlistentry> <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> @@ -1235,21 +1241,10 @@ messages to the channel's consumers. </para></listitem> </varlistentry> - <varlistentry> - <term>confirm</term> - <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem> - </varlistentry> - <varlistentry> - <term>messages_unconfirmed</term> - <listitem><para>Number of published messages not yet - confirmed. On channels not in confirm mode, this - remains 0.</para></listitem> - </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, - user, transactional, consumer_count, and - messages_unacknowledged are assumed. + user, consumer_count, and messages_unacknowledged are assumed. </para> <para role="example-prefix"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 7dabb8c3..65a3269a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -21,18 +21,17 @@ {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, {frame_max, 131072}, - {persister_max_wrap_entries, 500}, - {persister_hibernate_after, 10000}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, - {default_user_is_admin, true}, + {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {cluster_nodes, []}, {server_properties, []}, {collect_statistics, none}, + {collect_statistics_interval, 5000}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index db4773b8..ac6399c6 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -15,12 +15,12 @@ %% -record(user, {username, - is_admin, + tags, auth_backend, %% Module this user came from impl %% Scratch space for that module }). --record(internal_user, {username, password_hash, is_admin}). +-record(internal_user, {username, password_hash, tags}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). @@ -42,11 +42,12 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, internal, arguments}). +-record(exchange, {name, type, durable, auto_delete, internal, arguments, + scratch}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid}). + arguments, pid, slave_pids, mirror_nodes}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -67,8 +68,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message, - msg_seq_no}). +-record(delivery, {mandatory, immediate, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). @@ -86,7 +86,6 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(STATS_INTERVAL, 5000). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl index e26d44ea..803bb75c 100644 --- a/include/rabbit_auth_backend_spec.hrl +++ b/include/rabbit_auth_backend_spec.hrl @@ -22,8 +22,7 @@ {'ok', rabbit_types:user()} | {'refused', string(), [any()]} | {'error', any()}). --spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(), - rabbit_access_control:vhost_permission_atom()) -> +-spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> boolean() | {'error', any()}). -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 295d9039..ee102f5e 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -26,12 +26,11 @@ fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). --type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), - async_callback(), sync_callback()) -> state()). +-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), + async_callback()) -> state()). -spec(terminate/2 :: (any(), state()) -> state()). -spec(delete_and_terminate/2 :: (any(), state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). @@ -51,14 +50,6 @@ -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state()). --spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). --spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/4 :: - (rabbit_types:txn(), fun (() -> any()), - message_properties_transformer(), state()) -> {[ack()], state()}). -spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). @@ -71,7 +62,7 @@ -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(is_duplicate/3 :: - (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> +-spec(is_duplicate/2 :: + (rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}). -spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 31979a8e..38c81134 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -19,7 +19,7 @@ all: package: clean cp $(TARBALL_DIR)/$(TARBALL) $(DEBIAN_ORIG_TARBALL) - tar -zxvf $(DEBIAN_ORIG_TARBALL) + tar -zxf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ # Debian and descendants differ from most other distros in that diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index c4e01f4a..b5c342aa 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 809f518b..4a866305 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -5,7 +5,7 @@ PortSystem 1.0 name rabbitmq-server version @VERSION@ categories net -maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer +maintainers paperplanes.de:meyer openmaintainer platforms darwin supported_archs noarch @@ -24,11 +24,9 @@ distfiles ${name}-${version}${extract.suffix} \ checksums \ ${name}-${version}${extract.suffix} \ - md5 @md5-src@ \ sha1 @sha1-src@ \ rmd160 @rmd160-src@ \ ${name}-generic-unix-${version}${extract.suffix} \ - md5 @md5-bin@ \ sha1 @sha1-bin@ \ rmd160 @rmd160-bin@ diff --git a/packaging/macports/make-checksums.sh b/packaging/macports/make-checksums.sh index 11424dfc..891de6ba 100755 --- a/packaging/macports/make-checksums.sh +++ b/packaging/macports/make-checksums.sh @@ -6,7 +6,7 @@ for type in src bin do tarball_var=tarball_${type} tarball=${!tarball_var} - for algo in md5 sha1 rmd160 + for algo in sha1 rmd160 do checksum=$(openssl $algo ${tarball} | awk '{print $NF}') echo "s|@$algo-$type@|$checksum|g" diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile index 59803f9c..ab50e30b 100644 --- a/packaging/windows-exe/Makefile +++ b/packaging/windows-exe/Makefile @@ -2,7 +2,7 @@ VERSION=0.0.0 ZIP=../windows/rabbitmq-server-windows-$(VERSION) dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION) - makensis rabbitmq-$(VERSION).nsi + makensis -V2 rabbitmq-$(VERSION).nsi rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in sed \ @@ -10,7 +10,7 @@ rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in $< > $@ rabbitmq_server-$(VERSION): - unzip $(ZIP) + unzip -q $(ZIP) clean: rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 1ed4064e..27e4e1dc 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -113,17 +113,17 @@ Section "Start Menu" RabbitStartMenu CreateDirectory "$APPDATA\RabbitMQ\db" CreateDirectory "$SMPROGRAMS\RabbitMQ Server" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall RabbitMQ.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Plugins.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Logs.lnk" "$APPDATA\RabbitMQ\log" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Database Directory.lnk" "$APPDATA\RabbitMQ\db" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - (re)install.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - remove.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - start.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Service - stop.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" SetOutPath "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" - CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\RabbitMQ Command Prompt (sbin dir).lnk" "$WINDIR\system32\cmd.exe" "" "$WINDIR\system32\cmd.exe" SetOutPath $INSTDIR SectionEnd diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index dacfa620..a0be8d89 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,7 +4,7 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin @@ -24,7 +24,7 @@ dist: elinks -dump -no-references -no-numbering rabbitmq-service.html \ > $(TARGET_DIR)/readme-service.txt todos $(TARGET_DIR)/readme-service.txt - zip -r $(TARGET_ZIP).zip $(TARGET_DIR) + zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR) rm -rf $(TARGET_DIR) rabbitmq-service.html clean: clean_partial diff --git a/quickcheck b/quickcheck new file mode 100755 index 00000000..a36cf3ed --- /dev/null +++ b/quickcheck @@ -0,0 +1,36 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -sname quickcheck +-mode(compile). + +%% A helper to test quickcheck properties on a running broker +%% NodeStr is a local broker node name +%% ModStr is the module containing quickcheck properties +%% The number of trials is optional +main([NodeStr, ModStr | TrialsStr]) -> + {ok, Hostname} = inet:gethostname(), + Node = list_to_atom(NodeStr ++ "@" ++ Hostname), + Mod = list_to_atom(ModStr), + Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr), + case rpc:call(Node, code, ensure_loaded, [proper]) of + {module, proper} -> + case rpc:call(Node, proper, module, [Mod] ++ Trials) of + [] -> ok; + _ -> quit(1) + end; + {badrpc, Reason} -> + io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), + quit(2); + {error,nofile} -> + io:format("Module PropEr was not found on node ~p~n", [Node]), + quit(2) + end; +main([]) -> + io:format("This script requires a node name and a module.~n"). + +quit(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status) + end. + diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 61b08d49..235e14c0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -925,10 +925,10 @@ handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}; + update_counts(obtain, FromPid, -1, State)))}. -handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. +handle_info(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { elders = Elders, @@ -1133,9 +1133,9 @@ reduce(State = #fhc_state { open_pending = OpenPending, end end, case TRef of - undefined -> {ok, TRef1} = timer:apply_after( - ?FILE_HANDLES_CHECK_INTERVAL, - gen_server, cast, [?SERVER, check_counts]), + undefined -> TRef1 = erlang:send_after( + ?FILE_HANDLES_CHECK_INTERVAL, ?SERVER, + check_counts), State #fhc_state { timer_ref = TRef1 }; _ -> State end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 43e0a8f5..35258139 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -67,6 +67,11 @@ %% module. Note there is no form also encompassing a reply, thus if %% you wish to reply in handle_call/3 and change the callback module, %% you need to use gen_server2:reply/2 to issue the reply manually. +%% +%% 8) The callback module can optionally implement +%% format_message_queue/2 which is the equivalent of format_status/2 +%% but where the second argument is specifically the priority_queue +%% which contains the prioritised message_queue. %% All modifications are (C) 2009-2011 VMware, Inc. @@ -593,41 +598,35 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, - queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - {'$gen_cast', Msg}, - PC(Msg, GS2State), Queue) }; -in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, - queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - {'$gen_call', From, Msg}, - PC(Msg, From, GS2State), Queue) }; -in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> - GS2State #gs2_state { queue = priority_queue:in( - Input, PI(Input, GS2State), Queue) }. - -process_msg(Msg, - GS2State = #gs2_state { parent = Parent, - name = Name, - debug = Debug }) -> - case Msg of - {system, From, Req} -> - sys:handle_system_msg( - Req, From, Parent, ?MODULE, Debug, - GS2State); - %% gen_server puts Hib on the end as the 7th arg, but that - %% version of the function seems not to be documented so - %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Msg, GS2State); - _Msg when Debug =:= [] -> - handle_msg(Msg, GS2State); - _Msg -> - Debug1 = sys:handle_debug(Debug, fun print_event/3, - Name, {in, Msg}), - handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) - end. +in({'$gen_cast', Msg} = Input, + GS2State = #gs2_state { prioritise_cast = PC }) -> + in(Input, PC(Msg, GS2State), GS2State); +in({'$gen_call', From, Msg} = Input, + GS2State = #gs2_state { prioritise_call = PC }) -> + in(Input, PC(Msg, From, GS2State), GS2State); +in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) -> + in(Input, infinity, GS2State); +in({system, _From, _Req} = Input, GS2State) -> + in(Input, infinity, GS2State); +in(Input, GS2State = #gs2_state { prioritise_info = PI }) -> + in(Input, PI(Input, GS2State), GS2State). + +in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> + GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. + +process_msg({system, From, Req}, + GS2State = #gs2_state { parent = Parent, debug = Debug }) -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State); +process_msg({'EXIT', Parent, Reason} = Msg, + GS2State = #gs2_state { parent = Parent }) -> + %% gen_server puts Hib on the end as the 7th arg, but that version + %% of the fun seems not to be documented so leaving out for now. + terminate(Reason, Msg, GS2State); +process_msg(Msg, GS2State = #gs2_state { debug = [] }) -> + handle_msg(Msg, GS2State); +process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) -> + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }). %%% --------------------------------------------------- %%% Send/recive functions @@ -1161,17 +1160,22 @@ format_status(Opt, StatusData) -> end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> [{data, [{"State", State}]}] - end, + Specfic = callback(Mod, format_status, [Opt, [PDict, State]], + fun () -> [{data, [{"State", State}]}] end), + Messages = callback(Mod, format_message_queue, [Opt, Queue], + fun () -> priority_queue:to_list(Queue) end), [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", priority_queue:to_list(Queue)}]} | + {"Queued messages", Messages}]} | Specfic]. + +callback(Mod, FunName, Args, DefaultThunk) -> + case erlang:function_exported(Mod, FunName, length(Args)) of + true -> case catch apply(Mod, FunName, Args) of + {'EXIT', _} -> DefaultThunk(); + Success -> Success + end; + false -> DefaultThunk() + end. @@ -376,11 +376,11 @@ confirmed_broadcast/2, group_members/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_cast/2, prioritise_info/2]). + code_change/3, prioritise_info/2]). -export([behaviour_info/1]). --export([table_definitions/0, flush/1]). +-export([table_definitions/0]). -define(GROUP_TABLE, gm_group). -define(HIBERNATE_AFTER_MIN, 1000). @@ -511,9 +511,6 @@ confirmed_broadcast(Server, Msg) -> group_members(Server) -> gen_server2:call(Server, group_members, infinity). -flush(Server) -> - gen_server2:cast(Server, flush). - init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), @@ -629,12 +626,12 @@ handle_cast(join, State = #state { self = Self, {Module:joined(Args, all_known_members(View)), State1}); handle_cast(leave, State) -> - {stop, normal, State}; + {stop, normal, State}. -handle_cast(flush, State) -> - noreply( - flush_broadcast_buffer(State #state { broadcast_timer = undefined })). +handle_info(flush, State) -> + noreply( + flush_broadcast_buffer(State #state { broadcast_timer = undefined })); handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state { self = Self, @@ -684,9 +681,7 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_cast(flush, _State) -> 1; -prioritise_cast(_ , _State) -> 0. - +prioritise_info(flush, _State) -> 1; prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; prioritise_info(_ , _State) -> 0. @@ -808,10 +803,10 @@ ensure_broadcast_timer(State = #state { broadcast_buffer = [], State; ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = TRef }) -> - timer:cancel(TRef), + erlang:cancel_timer(TRef), State #state { broadcast_timer = undefined }; ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> - {ok, TRef} = timer:apply_after(?BROADCAST_TIMER, ?MODULE, flush, [self()]), + TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush), State #state { broadcast_timer = TRef }; ensure_broadcast_timer(State) -> State. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4a94b24b..4fc8b469 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -47,7 +47,10 @@ -ifdef(use_specs). --type(priority() :: integer()). +-export_type([q/0]). + +-type(q() :: pqueue()). +-type(priority() :: integer() | 'infinity'). -type(squeue() :: {queue, [any()], [any()]}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). @@ -71,8 +74,9 @@ new() -> is_queue({queue, R, F}) when is_list(R), is_list(F) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> - lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, - Queues); + lists:all(fun ({infinity, Q}) -> is_queue(Q); + ({P, Q}) -> is_integer(P) andalso is_queue(Q) + end, Queues); is_queue(_) -> false. @@ -89,7 +93,8 @@ len({pqueue, Queues}) -> to_list({queue, In, Out}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> - [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + [{maybe_negate_priority(P), V} || {P, Q} <- Queues, + {0, V} <- to_list(Q)]. in(Item, Q) -> in(Item, 0, Q). @@ -103,12 +108,20 @@ in(X, Priority, _Q = {queue, [], []}) -> in(X, Priority, Q = {queue, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> - P = -Priority, + P = maybe_negate_priority(Priority), {pqueue, case lists:keysearch(P, 1, Queues) of {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false when P == infinity -> + [{P, {queue, [X], []}} | Queues]; false -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + case Queues of + [{infinity, InfQueue} | Queues1] -> + [{infinity, InfQueue} | + lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + _ -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end end}. out({queue, [], []} = Q) -> @@ -141,7 +154,8 @@ join({queue, [], []}, B) -> join({queue, AIn, AOut}, {queue, BIn, BOut}) -> {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; join(A = {queue, _, _}, {pqueue, BPQ}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of [] -> [ {0, A} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; @@ -149,7 +163,8 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> end, {pqueue, Pre ++ Post1}; join({pqueue, APQ}, B = {queue, _, _}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of [] -> [ {0, B} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; @@ -165,7 +180,7 @@ merge(APQ, [], Acc) -> lists:reverse(Acc, APQ); merge([{P, A}|As], [{P, B}|Bs], Acc) -> merge(As, Bs, [ {P, join(A, B)} | Acc ]); -merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As, Bs, [ {PA, A} | Acc ]); merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). @@ -174,3 +189,6 @@ r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. + +maybe_negate_priority(infinity) -> infinity; +maybe_negate_priority(P) -> -P. diff --git a/src/rabbit.erl b/src/rabbit.erl index 8866a1b7..e067607d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -23,7 +23,7 @@ -export([start/2, stop/1]). --export([log_location/1]). +-export([log_location/1]). %% for testing %%--------------------------------------------------------------------------- %% Boot steps. @@ -134,6 +134,18 @@ {requires, empty_db_check}, {enables, routing_ready}]}). +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, recovery}, + {enables, routing_ready}]}). + +-rabbit_boot_step({mirrored_queues, + [{description, "adding mirrors to queues"}, + {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, + {requires, mirror_queue_slave_sup}, + {enables, routing_ready}]}). + -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, {requires, core_initialized}]}). @@ -201,14 +213,14 @@ prepare() -> start() -> try ok = prepare(), - ok = rabbit_misc:start_applications(?APPS) + ok = rabbit_misc:start_applications(application_load_order()) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = rabbit_misc:stop_applications(?APPS). + ok = rabbit_misc:stop_applications(application_load_order()). stop_and_halt() -> try @@ -267,20 +279,51 @@ stop(_State) -> ok. %%--------------------------------------------------------------------------- +%% application life cycle + +application_load_order() -> + ok = load_applications(), + {ok, G} = rabbit_misc:build_acyclic_graph( + fun (App, _Deps) -> [{App, App}] end, + fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, + [{App, app_dependencies(App)} || + {App, _Desc, _Vsn} <- application:loaded_applications()]), + true = digraph:del_vertices( + G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)), + Result = digraph_utils:topsort(G), + true = digraph:delete(G), + Result. + +load_applications() -> + load_applications(queue:from_list(?APPS), sets:new()). + +load_applications(Worklist, Loaded) -> + case queue:out(Worklist) of + {empty, _WorkList} -> + ok; + {{value, App}, Worklist1} -> + case sets:is_element(App, Loaded) of + true -> load_applications(Worklist1, Loaded); + false -> case application:load(App) of + ok -> ok; + {error, {already_loaded, App}} -> ok; + Error -> throw(Error) + end, + load_applications( + queue:join(Worklist1, + queue:from_list(app_dependencies(App))), + sets:add_element(App, Loaded)) + end + end. -erts_version_check() -> - FoundVer = erlang:system_info(version), - case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of - true -> ok; - false -> {error, {erlang_version_too_old, - {found, FoundVer}, {required, ?ERTS_MINIMUM}}} +app_dependencies(App) -> + case application:get_key(App, applications) of + undefined -> []; + {ok, Lst} -> Lst end. -boot_error(Format, Args) -> - io:format("BOOT ERROR: " ++ Format, Args), - error_logger:error_msg(Format, Args), - timer:sleep(1000), - exit({?MODULE, failure_during_boot}). +%%--------------------------------------------------------------------------- +%% boot step logic run_boot_step({StepName, Attributes}) -> Description = case lists:keysearch(description, 1, Attributes) of @@ -355,83 +398,46 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_error(Format, Args) -> + io:format("BOOT ERROR: " ++ Format, Args), + error_logger:error_msg(Format, Args), + timer:sleep(1000), + exit({?MODULE, failure_during_boot}). + %%--------------------------------------------------------------------------- +%% boot step functions -log_location(Type) -> - case application:get_env(Type, case Type of - kernel -> error_logger; - sasl -> sasl_error_logger - end) of - {ok, {file, File}} -> File; - {ok, false} -> undefined; - {ok, tty} -> tty; - {ok, silent} -> undefined; - {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); - _ -> undefined - end. +boot_delegate() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + rabbit_sup:start_child(delegate_sup, [Count]). -app_location() -> - {ok, Application} = application:get_application(), - filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). +recover() -> + rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). -home_dir() -> - case init:get_argument(home) of - {ok, [[Home]]} -> Home; - Other -> Other +maybe_insert_default_data() -> + case rabbit_mnesia:is_db_empty() of + true -> insert_default_data(); + false -> ok end. -config_files() -> - case init:get_argument(config) of - {ok, Files} -> [filename:absname( - filename:rootname(File, ".config") ++ ".config") || - File <- Files]; - error -> [] - end. +insert_default_data() -> + {ok, DefaultUser} = application:get_env(default_user), + {ok, DefaultPass} = application:get_env(default_pass), + {ok, DefaultTags} = application:get_env(default_user_tags), + {ok, DefaultVHost} = application:get_env(default_vhost), + {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = + application:get_env(default_permissions), + ok = rabbit_vhost:add(DefaultVHost), + ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), + ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags), + ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), + ok. %%--------------------------------------------------------------------------- - -print_banner() -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), - ProductLen = string:len(Product), - io:format("~n" - "+---+ +---+~n" - "| | | |~n" - "| | | |~n" - "| | | |~n" - "| +---+ +-------+~n" - "| |~n" - "| ~s +---+ |~n" - "| | | |~n" - "| ~s +---+ |~n" - "| |~n" - "+-------------------+~n" - "~s~n~s~n~s~n~n", - [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION, - ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - Settings = [{"node", node()}, - {"app descriptor", app_location()}, - {"home dir", home_dir()}, - {"config file(s)", config_files()}, - {"cookie hash", rabbit_misc:cookie_hash()}, - {"log", log_location(kernel)}, - {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}, - {"erlang version", erlang:system_info(version)}], - DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), - Format = fun (K, V) -> - io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", - [K, V]) - end, - lists:foreach(fun ({"config file(s)" = K, []}) -> - Format(K, "(none)"); - ({"config file(s)" = K, [V0 | Vs]}) -> - Format(K, V0), [Format("", V) || V <- Vs]; - ({K, V}) -> - Format(K, V) - end, Settings), - io:nl(). +%% logging ensure_working_log_handlers() -> Handlers = gen_event:which_handlers(error_logger), @@ -470,38 +476,19 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, end end. -boot_delegate() -> - {ok, Count} = application:get_env(rabbit, delegate_count), - rabbit_sup:start_child(delegate_sup, [Count]). - -recover() -> - rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). - -maybe_insert_default_data() -> - case rabbit_mnesia:is_db_empty() of - true -> insert_default_data(); - false -> ok +log_location(Type) -> + case application:get_env(Type, case Type of + kernel -> error_logger; + sasl -> sasl_error_logger + end) of + {ok, {file, File}} -> File; + {ok, false} -> undefined; + {ok, tty} -> tty; + {ok, silent} -> undefined; + {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); + _ -> undefined end. -insert_default_data() -> - {ok, DefaultUser} = application:get_env(default_user), - {ok, DefaultPass} = application:get_env(default_pass), - {ok, DefaultAdmin} = application:get_env(default_user_is_admin), - {ok, DefaultVHost} = application:get_env(default_vhost), - {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = - application:get_env(default_permissions), - ok = rabbit_vhost:add(DefaultVHost), - ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), - case DefaultAdmin of - true -> rabbit_auth_backend_internal:set_admin(DefaultUser); - _ -> ok - end, - ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), - ok. - rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). @@ -524,3 +511,75 @@ log_rotation_result(ok, {error, SaslLogError}) -> {error, {cannot_rotate_sasl_logs, SaslLogError}}; log_rotation_result(ok, ok) -> ok. + +%%--------------------------------------------------------------------------- +%% misc + +erts_version_check() -> + FoundVer = erlang:system_info(version), + case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of + true -> ok; + false -> {error, {erlang_version_too_old, + {found, FoundVer}, {required, ?ERTS_MINIMUM}}} + end. + +print_banner() -> + {ok, Product} = application:get_key(id), + {ok, Version} = application:get_key(vsn), + ProductLen = string:len(Product), + io:format("~n" + "+---+ +---+~n" + "| | | |~n" + "| | | |~n" + "| | | |~n" + "| +---+ +-------+~n" + "| |~n" + "| ~s +---+ |~n" + "| | | |~n" + "| ~s +---+ |~n" + "| |~n" + "+-------------------+~n" + "~s~n~s~n~s~n~n", + [Product, string:right([$v|Version], ProductLen), + ?PROTOCOL_VERSION, + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + Settings = [{"node", node()}, + {"app descriptor", app_location()}, + {"home dir", home_dir()}, + {"config file(s)", config_files()}, + {"cookie hash", rabbit_misc:cookie_hash()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}, + {"erlang version", erlang:system_info(version)}], + DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), + Format = fun (K, V) -> + io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", + [K, V]) + end, + lists:foreach(fun ({"config file(s)" = K, []}) -> + Format(K, "(none)"); + ({"config file(s)" = K, [V0 | Vs]}) -> + Format(K, V0), [Format("", V) || V <- Vs]; + ({K, V}) -> + Format(K, V) + end, Settings), + io:nl(). + +app_location() -> + {ok, Application} = application:get_application(), + filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + +home_dir() -> + case init:get_argument(home) of + {ok, [[Home]]} -> Home; + Other -> Other + end. + +config_files() -> + case init:get_argument(config) of + {ok, Files} -> [filename:absname( + filename:rootname(File, ".config") ++ ".config") || + File <- Files]; + error -> [] + end. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 59c00848..c0ae18c0 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -19,16 +19,15 @@ -include("rabbit.hrl"). -export([check_user_pass_login/2, check_user_login/2, - check_vhost_access/2, check_resource_access/3, list_vhosts/2]). + check_vhost_access/2, check_resource_access/3]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([permission_atom/0, vhost_permission_atom/0]). +-export_type([permission_atom/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). --type(vhost_permission_atom() :: 'read' | 'write'). -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) @@ -39,8 +38,6 @@ -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) -> 'ok' | rabbit_types:channel_exit()). --spec(list_vhosts/2 :: (rabbit_types:user(), vhost_permission_atom()) - -> [rabbit_types:vhost()]). -endif. @@ -70,7 +67,7 @@ check_vhost_access(User = #user{ username = Username, check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso - Module:check_vhost_access(User, VHostPath, write) + Module:check_vhost_access(User, VHostPath) end, "~s failed checking vhost access to ~s for ~s: ~p~n", [Module, VHostPath, Username], @@ -104,21 +101,3 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> false -> rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) end. - -%% Permission = write -> log in -%% Permission = read -> learn of the existence of (only relevant for -%% management plugin) -list_vhosts(User = #user{username = Username, auth_backend = Module}, - Permission) -> - lists:filter( - fun(VHost) -> - case Module:check_vhost_access(User, VHost, Permission) of - {error, _} = E -> - rabbit_log:warning("~w failed checking vhost access " - "to ~s for ~s: ~p~n", - [Module, VHost, Username, E]), - false; - Else -> - Else - end - end, rabbit_vhost:list()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c8703740..a7c92e51 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,20 +20,19 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4, reject/4]). + stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). +-export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). + %% internal --export([internal_declare/2, internal_delete/1, - run_backing_queue/3, run_backing_queue_async/3, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1, - emit_stats/1]). +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, + set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -99,7 +98,6 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') @@ -115,12 +113,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(ack/4 :: - (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) - -> 'ok'). +-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> @@ -145,14 +139,8 @@ -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/3 :: - (pid(), atom(), - (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(sync_timeout/1 :: (pid()) -> 'ok'). --spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). @@ -191,18 +179,21 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(Q) || Q <- DurableQueues], + Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none}), + {Node, MNodes} = determine_queue_nodes(Args), + Q = start_queue_process(Node, #amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + mirror_nodes = MNodes}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -233,15 +224,31 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> end). store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write), ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), +determine_queue_nodes(Args) -> + Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), + PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), + case {Policy, PolicyParams} of + {{_Type, <<"nodes">>}, {array, Nodes}} -> + case [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] of + [Node] -> {Node, undefined}; + [First | Rest] -> {First, Rest} + end; + {{_Type, <<"all">>}, _} -> + {node(), all}; + _ -> + {node(), undefined} + end. + +start_queue_process(Node, Q) -> + {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -257,8 +264,13 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); - {error, not_found} -> E() + {ok, Q = #amqqueue{slave_pids = []}} -> + rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); + {ok, Q} -> + E1 = fun () -> timer:sleep(25), with(Name, F, E) end, + rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end); + {error, not_found} -> + E() end. with(Name, F) -> @@ -295,31 +307,58 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> - rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>]). + rabbit_misc:assert_args_equivalence( + Args, RequiredArgs, QueueName, + [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key)) of + [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/1}, - {<<"x-message-ttl">>, fun check_integer_argument/1}]], + [{<<"x-expires">>, fun check_integer_argument/2}, + {<<"x-message-ttl">>, fun check_integer_argument/2}, + {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], ok. -check_integer_argument(undefined) -> +check_integer_argument(undefined, _Args) -> ok; -check_integer_argument({Type, Val}) when Val > 0 -> +check_integer_argument({Type, Val}, _Args) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}) -> +check_integer_argument({_Type, Val}, _Args) -> {error, {value_zero_or_less, Val}}. +check_ha_policy_argument(undefined, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"all">>}, _Args) -> + ok; +check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of + undefined -> + {error, {require, 'x-ha-policy-params'}}; + {array, []} -> + {error, {require_non_empty_list_of_nodes_for_ha}}; + {array, Ary} -> + case lists:all(fun ({longstr, _Node}) -> true; + (_ ) -> false + end, Ary) of + true -> ok; + false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} + end; + {Type, _} -> + {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} + end; +check_ha_policy_argument({longstr, Policy}, _Args) -> + {error, {invalid_ha_policy, Policy}}; +check_ha_policy_argument({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -360,9 +399,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -emit_stats(#amqqueue{pid = QPid}) -> - delegate_cast(QPid, emit_stats). - delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). @@ -383,21 +419,12 @@ deliver(QPid, Delivery) -> requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, Txn, MsgIds, ChPid) -> - delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> + delegate_cast(QPid, {ack, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). -commit_all(QPids, Txn, ChPid) -> - safe_delegate_call_ok( - fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, - QPids). - -rollback_all(QPids, Txn, ChPid) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end). - notify_down_all(QPids, ChPid) -> safe_delegate_call_ok( fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, @@ -448,33 +475,19 @@ internal_delete(QueueName) -> end). run_backing_queue(QPid, Mod, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). - -run_backing_queue_async(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). -sync_timeout(QPid) -> - gen_server2:cast(QPid, sync_timeout). - -update_ram_duration(QPid) -> - gen_server2:cast(QPid, update_ram_duration). - set_ram_duration_target(QPid, Duration) -> gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -maybe_expire(QPid) -> - gen_server2:cast(QPid, maybe_expire). - -drop_expired(QPid) -> - gen_server2:cast(QPid, drop_expired). - on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])), rabbit_binding:process_deletions( @@ -487,11 +500,13 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, - auto_delete = false, - arguments = [], - pid = Pid}. + #amqqueue{name = QueueName, + durable = false, + auto_delete = false, + arguments = [], + pid = Pid, + slave_pids = [], + mirror_nodes = undefined}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 07a24af8..05de48d6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -31,7 +31,9 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + +-export([init_with_backing_queue_state/7]). %% Queue's state -record(q, {q, @@ -60,7 +62,6 @@ monitor_ref, acktags, is_limit_active, - txn, unsent_message_count}). -define(STATISTICS_KEYS, @@ -72,7 +73,8 @@ messages, consumers, memory, - backing_queue_status + backing_queue_status, + slave_pids ]). -define(CREATION_EVENT_KEYS, @@ -81,7 +83,8 @@ durable, auto_delete, arguments, - owner_pid + owner_pid, + mirror_nodes ]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -114,6 +117,34 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, AckTags, Deliveries, MTC) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = MTC})), + lists:foldl( + fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, + State, Deliveries). + terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> @@ -161,14 +192,7 @@ bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, fun (Mod, Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) - end, - fun (Mod, Fun) -> - rabbit_misc:with_exit_handler( - fun () -> error end, - fun () -> - rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) - end) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -185,22 +209,14 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), case BQS of - undefined -> State; + undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - BQS1 = lists:foldl( - fun (#cr{txn = none}, BQSN) -> - BQSN; - (#cr{txn = Txn}, BQSN) -> - {_AckTags, BQSN1} = - BQ:tx_rollback(Txn, BQSN), - BQSN1 - end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) || {Ch, CTag, _} <- consumers(State1)], - State1#q{backing_queue_state = Fun(BQS1)} + State1#q{backing_queue_state = Fun(BQS)} end. reply(Reply, NewState) -> @@ -225,13 +241,15 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{}) -> - {ok, BQM} = application:get_env(backing_queue_module), - BQM. +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Policy -> rabbit_mirror_queue_master + end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -239,14 +257,12 @@ ensure_sync_timer(State) -> stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> State; stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{sync_timer_ref = undefined}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), + TRef = erlang:send_after( + ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; @@ -258,13 +274,13 @@ stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#q{expiry_timer_ref = undefined}. %% We wish to expire only when there are no consumers *and* the expiry @@ -276,18 +292,16 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), - {ok, TRef} = timer:apply_after( - Expires, rabbit_amqqueue, maybe_expire, [self()]), + TRef = erlang:send_after(Expires, self(), maybe_expire), NewState#q{expiry_timer_ref = TRef}; false -> State end. ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = Q}) -> + q = #amqqueue{pid = QPid}}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> rabbit_amqqueue:emit_stats(Q) end)}. + StatsTimer, QPid, emit_stats)}. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -308,7 +322,6 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), is_limit_active = false, - txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -320,13 +333,12 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, - txn = Txn, unsent_message_count = UnsentMessageCount}) -> - case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of - {0, 0, 0, none} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of + {0, 0, 0} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true end. erase_ch_record(#cr{ch_pid = ChPid, @@ -431,9 +443,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - gb_trees_foreach(fun(ChPid, MsgSeqNos) -> - rabbit_channel:confirm(ChPid, MsgSeqNos) - end, CMs), + gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), State#q{msg_id_to_channel = MTC1}. gb_trees_foreach(_, none) -> @@ -480,8 +490,7 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(Delivery = #delivery{txn = none, - sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -490,7 +499,7 @@ attempt_delivery(Delivery = #delivery{txn = none, immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - case BQ:is_duplicate(none, Message, BQS) of + case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -522,24 +531,6 @@ attempt_delivery(Delivery = #delivery{txn = none, discarded -> false end, {Delivered, Confirm, State#q{backing_queue_state = BQS1}} - end; -attempt_delivery(Delivery = #delivery{txn = Txn, - sender = ChPid, - message = Message}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case BQ:is_duplicate(Txn, Message, BQS) of - {false, BQS1} -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, - BQS1), - {true, Confirm, State#q{backing_queue_state = BQS2}}; - {Duplicate, BQS1} -> - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, @@ -619,7 +610,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -632,13 +623,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> State2 = case Txn of - none -> State1; - _ -> rollback_transaction(Txn, C, - State1) - end, - {ok, requeue_and_run(sets:to_list(ChAckTags), - ensure_expiry_timer(State2))} + false -> {ok, requeue_and_run(sets:to_list(ChAckTags), + ensure_expiry_timer(State1))} end end. @@ -672,25 +658,6 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit( - Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), BQS), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1}. - -rollback_transaction(Txn, C, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here. - maybe_store_ch_record(C#cr{txn = none}), - State#q{backing_queue_state = BQS1}. - subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). @@ -728,8 +695,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, - [self()]), + false -> TRef = erlang:send_after(TTL, self(), drop_expired), State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> @@ -771,6 +737,12 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + SPids; +i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), + MNodes; i(Item, _) -> throw({bad_argument, Item}). @@ -809,31 +781,32 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; + {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; - sync_timeout -> 6; _ -> 0 end. -prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; -prioritise_info(_Msg, _State) -> 0. +prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + case Msg of + {'DOWN', _, process, DownPid, _} -> 8; + update_ram_duration -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + sync_timeout -> 6; + _ -> 0 + end. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -894,13 +867,6 @@ handle_call({deliver, Delivery}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({commit, Txn, ChPid}, From, State) -> - case lookup_ch(ChPid) of - not_found -> reply(ok, State); - C -> noreply(run_message_queue( - commit_transaction(Txn, From, C, State))) - end; - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -1040,40 +1006,25 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) - end; - -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, run_backing_queue(Mod, Fun, State)). - + end. handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast(sync_timeout, State) -> - noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); - handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, Txn, AckTags, ChPid}, +handle_cast({ack, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, State1} = - case Txn of - none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - NewC = C#cr{acktags = ChAckTags1}, - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - {NewC, State#q{backing_queue_state = BQS1}}; - _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), - {C#cr{txn = Txn}, - State#q{backing_queue_state = BQS1}} - end, - maybe_store_ch_record(C1), - noreply(State1) + maybe_store_ch_record(C#cr{acktags = subtract_acks( + ChAckTags, AckTags)}), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + noreply(State#q{backing_queue_state = BQS1}) end; handle_cast({reject, AckTags, Requeue, ChPid}, @@ -1092,12 +1043,6 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end) end; -handle_cast({rollback, Txn, ChPid}, State) -> - noreply(case lookup_ch(ChPid) of - not_found -> State; - C -> rollback_transaction(Txn, C, State) - end); - handle_cast(delete_immediately, State) -> {stop, normal, State}; @@ -1133,15 +1078,6 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(update_ram_duration, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), @@ -1149,24 +1085,24 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); + noreply(State). -handle_cast(maybe_expire, State) -> +handle_info(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(drop_expired, State) -> +handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -1183,6 +1119,18 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_info(sync_timeout, State) -> + noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); + handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -1203,10 +1151,11 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), - rabbit_event:if_enabled(StatsTimer, - fun () -> - emit_stats(State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. + +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 1344956e..2c28adce 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_child/1]). +-export([start_link/0, start_child/2]). -export([init/1]). @@ -29,8 +29,8 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Args) -> - supervisor2:start_child(?SERVER, Args). +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). init([]) -> {ok, {{simple_one_for_one_terminate, 10, 10}, diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index 09820c5b..ade158bb 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -36,17 +36,13 @@ behaviour_info(callbacks) -> %% Client failed authentication. Log and die. {check_user_login, 2}, - %% Given #user, vhost path and permission, can a user access a vhost? - %% Permission is read - learn of the existence of (only relevant for - %% management plugin) - %% or write - log in - %% + %% Given #user and vhost, can a user log in to a vhost? %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. - {check_vhost_access, 3}, + {check_vhost_access, 2}, %% Given #user, resource and permission, can a user access a resource? %% diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 2a42ff88..6a018bd1 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -20,10 +20,10 @@ -behaviour(rabbit_auth_backend). -export([description/0]). --export([check_user_login/2, check_vhost_access/3, check_resource_access/3]). +-export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). +-export([add_user/2, delete_user/1, change_password/2, set_tags/2, + list_users/0, user_info_keys/0, lookup_user/1, clear_password/1]). -export([make_salt/0, check_password/2, change_password_hash/2, hash_password/1]). -export([set_permissions/5, clear_permissions/2, @@ -50,9 +50,9 @@ rabbit_types:password_hash()) -> 'ok'). -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). --spec(set_admin/1 :: (rabbit_types:username()) -> 'ok'). --spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok'). --spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]). +-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). +-spec(list_users/0 :: () -> rabbit_types:infos()). +-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(lookup_user/1 :: (rabbit_types:username()) -> rabbit_types:ok(rabbit_types:internal_user()) | rabbit_types:error('not_found')). @@ -77,6 +77,7 @@ %%---------------------------------------------------------------------------- -define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). %% Implementation of rabbit_auth_backend @@ -97,10 +98,10 @@ check_user_login(Username, AuthProps) -> internal_check_user_login(Username, Fun) -> Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of - {ok, User = #internal_user{is_admin = IsAdmin}} -> + {ok, User = #internal_user{tags = Tags}} -> case Fun(User) of true -> {ok, #user{username = Username, - is_admin = IsAdmin, + tags = Tags, auth_backend = ?MODULE, impl = User}}; _ -> Refused @@ -109,16 +110,13 @@ internal_check_user_login(Username, Fun) -> Refused end. -check_vhost_access(#user{is_admin = true}, _VHostPath, read) -> - true; - -check_vhost_access(#user{username = Username}, VHostPath, _) -> +check_vhost_access(#user{username = Username}, VHost) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, #user_vhost{username = Username, - virtual_host = VHostPath}}) of + virtual_host = VHost}}) of [] -> false; [_R] -> true end @@ -161,7 +159,7 @@ add_user(Username, Password) -> #internal_user{username = Username, password_hash = hash_password(Password), - is_admin = false}, + tags = []}, write); _ -> mnesia:abort({user_already_exists, Username}) @@ -222,16 +220,12 @@ salted_md5(Salt, Cleartext) -> Salted = <<Salt/binary, Cleartext/binary>>, erlang:md5(Salted). -set_admin(Username) -> set_admin(Username, true). - -clear_admin(Username) -> set_admin(Username, false). - -set_admin(Username, IsAdmin) -> +set_tags(Username, Tags) -> R = update_user(Username, fun(User) -> - User#internal_user{is_admin = IsAdmin} + User#internal_user{tags = Tags} end), - rabbit_log:info("Set user admin flag for user ~p to ~p~n", - [Username, IsAdmin]), + rabbit_log:info("Set user tags for user ~p to ~p~n", + [Username, Tags]), R. update_user(Username, Fun) -> @@ -244,10 +238,12 @@ update_user(Username, Fun) -> end)). list_users() -> - [{Username, IsAdmin} || - #internal_user{username = Username, is_admin = IsAdmin} <- + [[{user, Username}, {tags, Tags}] || + #internal_user{username = Username, tags = Tags} <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. +user_info_keys() -> ?USER_INFO_KEYS. + lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 217ad3eb..77278416 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -44,9 +44,7 @@ behaviour_info(callbacks) -> %% makes it useful for passing messages back into the backing %% queue, especially as the backing queue does not have %% control of its own mailbox. - %% 4. a synchronous callback. Same as the asynchronous callback - %% but waits for completion and returns 'error' on error. - {init, 4}, + {init, 3}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 2}, @@ -107,21 +105,6 @@ behaviour_info(callbacks) -> %% about. Must return 1 msg_id per Ack, in the same order as Acks. {ack, 2}, - %% A publish, but in the context of a transaction. - {tx_publish, 5}, - - %% Acks, but in the context of a transaction. - {tx_ack, 3}, - - %% Undo anything which has been done in the context of the - %% specified transaction. - {tx_rollback, 2}, - - %% Commit a transaction. The Fun passed in must be called once - %% the messages have really been commited. This CPS permits the - %% possibility of commit coalescing. - {tx_commit, 4}, - %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. {requeue, 3}, @@ -175,7 +158,7 @@ behaviour_info(callbacks) -> %% the BQ to signal that it's already seen this message (and in %% what capacity - i.e. was it published previously or discarded %% previously) and thus the message should be dropped. - {is_duplicate, 3}, + {is_duplicate, 2}, %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ for some diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl new file mode 100644 index 00000000..22691ef9 --- /dev/null +++ b/src/rabbit_backing_queue_qc.erl @@ -0,0 +1,392 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_backing_queue_qc). +-ifdef(use_proper_qc). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). +-include_lib("proper/include/proper.hrl"). + +-behaviour(proper_statem). + +-define(BQMOD, rabbit_variable_queue). +-define(QUEUE_MAXLEN, 10000). +-define(TIMEOUT_LIMIT, 100). + +-define(RECORD_INDEX(Key, Record), + proplists:get_value(Key, lists:zip( + record_info(fields, Record), lists:seq(2, record_info(size, Record))))). + +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). + +-record(state, {bqstate, + len, %% int + messages, %% queue of {msg_props, basic_msg} + acks, %% dict of acktag => {msg_props, basic_msg} + confirms}). %% set of msgid + +%% Initialise model + +initial_state() -> + #state{bqstate = qc_variable_queue_init(qc_test_queue()), + len = 0, + messages = queue:new(), + acks = orddict:new(), + confirms = gb_sets:new()}. + +%% Property + +prop_backing_queue_test() -> + ?FORALL(Cmds, commands(?MODULE, initial_state()), + backing_queue_test(Cmds)). + +backing_queue_test(Cmds) -> + {ok, FileSizeLimit} = + application:get_env(rabbit, msg_store_file_size_limit), + application:set_env(rabbit, msg_store_file_size_limit, 512, + infinity), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + application:set_env(rabbit, queue_index_max_journal_entries, 128, + infinity), + + {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds), + + application:set_env(rabbit, msg_store_file_size_limit, + FileSizeLimit, infinity), + application:set_env(rabbit, queue_index_max_journal_entries, + MaxJournal, infinity), + + ?BQMOD:delete_and_terminate(shutdown, BQ), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +%% Commands + +%% Command frequencies are tuned so that queues are normally reasonably +%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple +%% and purging cause extreme queue lengths, so these have lower probabilities. +%% Fetches are sufficiently frequent so that commands that need acktags +%% get decent coverage. + +command(S) -> + frequency([{10, qc_publish(S)}, + {1, qc_publish_delivered(S)}, + {1, qc_publish_multiple(S)}, %% very slow + {15, qc_fetch(S)}, %% needed for ack and requeue + {15, qc_ack(S)}, + {15, qc_requeue(S)}, + {3, qc_set_ram_duration_target(S)}, + {1, qc_ram_duration(S)}, + {1, qc_drain_confirmed(S)}, + {1, qc_dropwhile(S)}, + {1, qc_is_empty(S)}, + {1, qc_timeout(S)}, + {1, qc_purge(S)}]). + +qc_publish(#state{bqstate = BQ}) -> + {call, ?BQMOD, publish, + [qc_message(), + #message_properties{needs_confirming = frequency([{1, true}, + {20, false}]), + expiry = oneof([undefined | lists:seq(1, 10)])}, + self(), BQ]}. + +qc_publish_multiple(#state{bqstate = BQ}) -> + {call, ?MODULE, publish_multiple, + [qc_message(), #message_properties{}, BQ, + resize(?QUEUE_MAXLEN, pos_integer())]}. + +qc_publish_delivered(#state{bqstate = BQ}) -> + {call, ?BQMOD, publish_delivered, + [boolean(), qc_message(), #message_properties{}, self(), BQ]}. + +qc_fetch(#state{bqstate = BQ}) -> + {call, ?BQMOD, fetch, [boolean(), BQ]}. + +qc_ack(#state{bqstate = BQ, acks = Acks}) -> + {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}. + +qc_requeue(#state{bqstate = BQ, acks = Acks}) -> + {call, ?BQMOD, requeue, + [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + +qc_set_ram_duration_target(#state{bqstate = BQ}) -> + {call, ?BQMOD, set_ram_duration_target, + [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}. + +qc_ram_duration(#state{bqstate = BQ}) -> + {call, ?BQMOD, ram_duration, [BQ]}. + +qc_drain_confirmed(#state{bqstate = BQ}) -> + {call, ?BQMOD, drain_confirmed, [BQ]}. + +qc_dropwhile(#state{bqstate = BQ}) -> + {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. + +qc_is_empty(#state{bqstate = BQ}) -> + {call, ?BQMOD, is_empty, [BQ]}. + +qc_timeout(#state{bqstate = BQ}) -> + {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}. + +qc_purge(#state{bqstate = BQ}) -> + {call, ?BQMOD, purge, [BQ]}. + +%% Preconditions + +precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) + when Fun =:= ack; Fun =:= requeue -> + orddict:size(Acks) > 0; +precondition(#state{messages = Messages}, + {call, ?BQMOD, publish_delivered, _Arg}) -> + queue:is_empty(Messages); +precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> + true; +precondition(_S, {call, ?MODULE, timeout, _Arg}) -> + true; +precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> + Len < ?QUEUE_MAXLEN. + +%% Model updates + +next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> + #state{len = Len, messages = Messages, confirms = Confirms} = S, + MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, + NeedsConfirm = + {call, erlang, element, + [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, + S#state{bqstate = BQ, + len = Len + 1, + messages = queue:in({MsgProps, Msg}, Messages), + confirms = case eval(NeedsConfirm) of + true -> gb_sets:add(MsgId, Confirms); + _ -> Confirms + end}; + +next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> + #state{len = Len, messages = Messages} = S, + Messages1 = repeat(Messages, fun(Msgs) -> + queue:in({MsgProps, Msg}, Msgs) + end, Count), + S#state{bqstate = BQ, + len = Len + Count, + messages = Messages1}; + +next_state(S, Res, + {call, ?BQMOD, publish_delivered, + [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> + #state{confirms = Confirms, acks = Acks} = S, + AckTag = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, + NeedsConfirm = + {call, erlang, element, + [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, + S#state{bqstate = BQ1, + confirms = case eval(NeedsConfirm) of + true -> gb_sets:add(MsgId, Confirms); + _ -> Confirms + end, + acks = case AckReq of + true -> orddict:append(AckTag, {MsgProps, Msg}, Acks); + false -> Acks + end + }; + +next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> + #state{len = Len, messages = Messages, acks = Acks} = S, + ResultInfo = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + AckTag = {call, erlang, element, [3, ResultInfo]}, + S1 = S#state{bqstate = BQ1}, + case queue:out(Messages) of + {empty, _M2} -> + S1; + {{value, MsgProp_Msg}, M2} -> + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)}; + false -> + S2 + end + end; + +next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> + #state{acks = AcksState} = S, + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1, + acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + +next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> + #state{len = Len, messages = Messages, acks = AcksState} = S, + BQ1 = {call, erlang, element, [2, Res]}, + RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || + Key <- AcksArg]), + S#state{bqstate = BQ1, + len = Len + length(RequeueMsgs), + messages = queue:join(Messages, queue:from_list(RequeueMsgs)), + acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + +next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> + S#state{bqstate = BQ}; + +next_state(S, Res, {call, ?BQMOD, ram_duration, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}; + +next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}; + +next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> + #state{messages = Messages} = S, + Messages1 = drop_messages(Messages), + S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1}; + +next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> + S; + +next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> + S#state{bqstate = BQ}; + +next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1, len = 0, messages = queue:new()}. + +%% Postconditions + +postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> + #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, + case Res of + {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> + {_MsgProps, Msg} = queue:head(Messages), + MsgFetched =:= Msg andalso + not orddict:is_key(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms) andalso + RemainingLen =:= Len - 1; + {empty, _BQ} -> + Len =:= 0 + end; + +postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> + #state{acks = Acks, confirms = Confrms} = S, + not orddict:is_key(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms); + +postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> + {PurgeCount, _BQ} = Res, + Len =:= PurgeCount; + +postcondition(#state{len = Len}, + {call, ?BQMOD, is_empty, _Args}, Res) -> + (Len =:= 0) =:= Res; + +postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> + #state{confirms = Confirms} = S, + {ReportedConfirmed, _BQ} = Res, + lists:all(fun (M) -> + gb_sets:is_element(M, Confirms) + end, ReportedConfirmed); + +postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> + ?BQMOD:len(BQ) =:= Len. + +%% Helpers + +repeat(Result, _Fun, 0) -> + Result; +repeat(Result, Fun, Times) -> + repeat(Fun(Result), Fun, Times - 1). + +publish_multiple(Msg, MsgProps, BQ, Count) -> + repeat(BQ, fun(BQ1) -> + ?BQMOD:publish(Msg, MsgProps, self(), BQ1) + end, Count). + +timeout(BQ, 0) -> + BQ; +timeout(BQ, AtMost) -> + case ?BQMOD:needs_timeout(BQ) of + false -> BQ; + _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1) + end. + +qc_message_payload() -> + ?SIZED(Size, resize(Size * Size, binary())). + +qc_routing_key() -> + noshrink(binary(10)). + +qc_delivery_mode() -> + oneof([1, 2]). + +qc_message() -> + qc_message(qc_delivery_mode()). + +qc_message(DeliveryMode) -> + {call, rabbit_basic, message, [ + qc_default_exchange(), + qc_routing_key(), + #'P_basic'{delivery_mode = DeliveryMode}, + qc_message_payload()]}. + +qc_default_exchange() -> + {call, rabbit_misc, r, [<<>>, exchange, <<>>]}. + +qc_variable_queue_init(Q) -> + {call, ?BQMOD, init, + [Q, false, function(2, ok)]}. + +qc_test_q() -> + {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. + +qc_test_queue() -> + qc_test_queue(boolean()). + +qc_test_queue(Durable) -> + #amqqueue{name = qc_test_q(), + durable = Durable, + auto_delete = false, + arguments = [], + pid = self()}. + +rand_choice([]) -> []; +rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. + +dropfun(Props) -> + Expiry = eval({call, erlang, element, + [?RECORD_INDEX(expiry, message_properties), Props]}), + Expiry =/= 1. + +drop_messages(Messages) -> + case queue:out(Messages) of + {empty, _} -> + Messages; + {{value, MsgProps_Msg}, M2} -> + MsgProps = {call, erlang, element, [1, MsgProps_Msg]}, + case dropfun(MsgProps) of + true -> drop_messages(M2); + false -> Messages + end + end. + +-endif. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index fa7e3a5a..9cc406e7 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/3, message/4, properties/1, delivery/5]). --export([publish/4, publish/7]). +-export([publish/1, message/3, message/4, properties/1, delivery/4]). +-export([publish/4, publish/6]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -37,9 +37,8 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/5 :: - (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message(), undefined | integer()) -> +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -53,10 +52,9 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/7 :: +-spec(publish/6 :: (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), - rabbit_types:maybe(rabbit_types:txn()), properties_input(), - body_input()) -> publish_result()). + properties_input(), body_input()) -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -73,9 +71,9 @@ publish(Delivery = #delivery{ Other -> Other end. -delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message, msg_seq_no = MsgSeqNo}. +delivery(Mandatory, Immediate, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), + message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); @@ -157,24 +155,23 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, none, Properties, - Body). + publish(Exchange, RoutingKeyBin, false, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn, - Props, Body) -> - publish(X, delivery(Mandatory, Immediate, Txn, +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> + publish(X, delivery(Mandatory, Immediate, message(XName, RKey, properties(Props), Body), undefined)); -publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) -> +publish(XName, RKey, Mandatory, Immediate, Props, Body) -> case rabbit_exchange:lookup(XName) of - {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body); + {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body); Err -> Err end. publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {RoutingRes, DeliveredQPids} = + rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), {ok, RoutingRes, DeliveredQPids}. is_message_persistent(#content{properties = #'P_basic'{ diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 5873537c..205d5bba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -94,8 +94,6 @@ routing_key, arguments]). recover(XNames, QNames) -> - XNameSet = sets:from_list(XNames), - QNameSet = sets:from_list(QNames), rabbit_misc:table_filter( fun (Route) -> mnesia:read({rabbit_semi_durable_route, Route}) =:= [] @@ -105,27 +103,33 @@ recover(XNames, QNames) -> (_Route, false) -> ok end, rabbit_durable_route), - rabbit_misc:table_filter( - fun (#route{binding = #binding{destination = Dst = - #resource{kind = Kind}}}) -> - sets:is_element(Dst, case Kind of - exchange -> XNameSet; - queue -> QNameSet - end) - end, - fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> - {ok, X} = rabbit_exchange:lookup(Src), - Serial = case Tx of - true -> ok = sync_transient_route( - R, fun mnesia:write/3), - transaction; - false -> rabbit_exchange:serial(X) - end, - rabbit_exchange:callback(X, add_binding, [Serial, X, B]) - end, - rabbit_semi_durable_route), + XNameSet = sets:from_list(XNames), + QNameSet = sets:from_list(QNames), + SelectSet = fun (#resource{kind = exchange}) -> XNameSet; + (#resource{kind = queue}) -> QNameSet + end, + [recover_semi_durable_route(R, SelectSet(Dst)) || + R = #route{binding = #binding{destination = Dst}} <- + rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], ok. +recover_semi_durable_route(R = #route{binding = B}, ToRecover) -> + #binding{source = Src, destination = Dst} = B, + {ok, X} = rabbit_exchange:lookup(Src), + rabbit_misc:execute_mnesia_transaction( + fun () -> + Rs = mnesia:match_object(rabbit_semi_durable_route, R, read), + case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of + false -> no_recover; + true -> ok = sync_transient_route(R, fun mnesia:write/3), + rabbit_exchange:serial(X) + end + end, + fun (no_recover, _) -> ok; + (_Serial, true) -> x_callback(transaction, X, add_binding, B); + (Serial, false) -> x_callback(Serial, X, add_binding, B) + end). + exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 991b0b06..45f0032d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -23,15 +23,15 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). +-export([refresh_config_all/0, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, transaction_id, tx_participants, - next_tag, uncommitted_ack_q, unacked_message_q, + limiter_pid, start_limiter_fun, tx_status, next_tag, + unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -46,6 +46,7 @@ consumer_count, messages_unacknowledged, messages_unconfirmed, + messages_uncommitted, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -90,7 +91,6 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(refresh_config_all/0 :: () -> 'ok'). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. @@ -152,9 +152,6 @@ refresh_config_all() -> fun (C) -> gen_server2:call(C, refresh_config) end, list()), ok. -emit_stats(Pid) -> - gen_server2:cast(Pid, emit_stats). - ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). @@ -173,11 +170,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, - transaction_id = none, - tx_participants = sets:new(), + tx_status = none, next_tag = 1, - uncommitted_ack_q = queue:new(), unacked_message_q = queue:new(), + uncommitted_message_q = queue:new(), + uncommitted_ack_q = queue:new(), user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -195,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State) end), + fun() -> emit_stats(State) end), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -208,11 +205,16 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(flush, _From, State) -> reply(ok, State); @@ -286,20 +288,14 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - - maybe_incr_stats([{QPid, 1}], - case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State), + maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> - internal_emit_stats(State), - noreply([ensure_stats_timer], - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); - handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). @@ -307,6 +303,11 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); +handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> + emit_stats(State), + noreply([ensure_stats_timer], + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); + handle_info({'DOWN', MRef, process, QPid, Reason}, State = #ch{consumer_monitors = ConsumerMonitors}) -> noreply( @@ -322,16 +323,13 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, - fun () -> - internal_emit_stats( - State, [{idle_since, now()}]) - end), + rabbit_event:if_enabled( + StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(Reason, State) -> - {Res, _State1} = rollback_and_notify(State), + {Res, _State1} = notify_queues(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; @@ -344,6 +342,8 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> reply(Reply, [], NewState). @@ -368,8 +368,7 @@ next_state(Mask, State) -> ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(ChPid) end)}. + StatsTimer, ChPid, emit_stats)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -386,8 +385,8 @@ send_exception(Reason, State = #ch{protocol = Protocol, rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", [ConnPid, Channel, Reason]), - %% something bad's happened: rollback_and_notify may not be 'ok' - {_Result, State1} = rollback_and_notify(State), + %% something bad's happened: notify_queues may not be 'ok' + {_Result, State1} = notify_queues(State), case CloseChannel of Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; @@ -538,17 +537,13 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack, State); + Acc, Nack); none -> Acc end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, - State) -> - %% these confirms will be emitted even when a queue dies, but that - %% should be fine, since the queue stats get erased immediately - maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> UQM1 = case gb_trees:lookup(QPid, UQM) of {value, MsgSeqNos} -> MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), @@ -589,10 +584,19 @@ handle_method(_Method, _, State = #ch{state = closing}) -> {noreply, State}; handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> - {ok, State1} = rollback_and_notify(State), + {ok, State1} = notify_queues(State), ReaderPid ! {channel_closing, self()}, {noreply, State1}; +%% Even though the spec prohibits the client from sending commands +%% while waiting for the reply to a synchronous command, we generally +%% do allow this...except in the case of a pending tx.commit, where +%% it could wreak havoc. +handle_method(_Method, _, #ch{tx_status = TxStatus}) + when TxStatus =/= none andalso TxStatus =/= in_progress -> + rabbit_misc:protocol_error( + channel_error, "unexpected command while processing 'tx.commit'", []); + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -601,7 +605,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, + tx_status = TxStatus, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -613,29 +617,24 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), {MsgSeqNo, State1} = - case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + case {TxStatus, ConfirmEnabled} of + {none, false} -> {undefined, State}; + {_, _} -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, - ExchangeName, MsgSeqNo, Message, - State1), - maybe_incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - {noreply, case TxnKey of - none -> State2; - _ -> add_tx_participants(DeliveredQPids, State2) - end}; + Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, + MsgSeqNo), + QNames = rabbit_exchange:route(Exchange, Delivery), + {noreply, + case TxStatus of + none -> deliver_to_queues({Delivery, QNames}, State1); + in_progress -> TMQ = State1#ch.uncommitted_message_q, + NewTMQ = queue:in({Delivery, QNames}, TMQ), + State1#ch{uncommitted_message_q = NewTMQ} + end}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, "invalid message: ~p", [Reason]) @@ -649,22 +648,16 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{transaction_id = TxnKey, - unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ, + tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - QIncs = ack(TxnKey, Acked), - Participants = [QPid || {QPid, _} <- QIncs], - maybe_incr_stats(QIncs, ack, State), - {noreply, case TxnKey of - none -> ok = notify_limiter(State#ch.limiter_pid, Acked), - State#ch{unacked_message_q = Remaining}; - _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, - Acked), - add_tx_participants( - Participants, - State#ch{unacked_message_q = Remaining, - uncommitted_ack_q = NewUAQ}) - end}; + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> ack(Acked, State1); + in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), + State1#ch{uncommitted_ack_q = NewTAQ} + end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -685,11 +678,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - maybe_incr_stats([{QPid, 1}], - case NoAck of - true -> get_no_ack; - false -> get - end, State), + maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -894,7 +887,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configure_permitted(ExchangeName, State), check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -990,7 +982,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), - check_configure_permitted(QueueName, State), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), @@ -1047,33 +1038,33 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); - handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> rabbit_misc:protocol_error( precondition_failed, "cannot switch from confirm to tx mode", []); -handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> - {reply, #'tx.select_ok'{}, new_tx(State)}; - handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State}; + {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; -handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> +handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State) -> - {reply, #'tx.commit_ok'{}, internal_commit(State)}; +handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, + uncommitted_ack_q = TAQ}) -> + State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + State, TMQ))), + {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; -handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> +handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.rollback'{}, _, State) -> - {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_ack_q = TAQ}) -> + {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = + queue:join(TAQ, UAMQ)})}; -handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) - when TxId =/= none -> +handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); @@ -1139,10 +1130,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> %% process_confirms to prevent each MsgSeqNo being removed from %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = case Reason of - normal -> {false, fun record_confirms/2}; - _ -> {true, fun send_nacks/2} - end, + {Nack, SendFun} = + case Reason of + Reason when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> + {false, fun record_confirms/2}; + {shutdown, _} -> + {false, fun record_confirms/2}; + _ -> + {true, fun send_nacks/2} + end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), erase_queue_stats(QPid), State3 = SendFun(MXs, State2), @@ -1252,55 +1249,24 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) end. -add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> - State#ch{tx_participants = sets:union(Participants, - sets:from_list(MoreP))}. - -ack(TxnKey, UAQ) -> - fold_per_queue( - fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [{QPid, length(MsgIds)} | L] - end, [], UAQ). - -make_tx_id() -> rabbit_guid:guid(). - -new_tx(State) -> - State#ch{transaction_id = make_tx_id(), - tx_participants = sets:new(), - uncommitted_ack_q = queue:new()}. - -internal_commit(State = #ch{transaction_id = TxnKey, - tx_participants = Participants}) -> - case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey, self()) of - ok -> ok = notify_limiter(State#ch.limiter_pid, - State#ch.uncommitted_ack_q), - new_tx(State); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "commit failed: ~w", [Errors]) - end. +ack(Acked, State) -> + QIncs = fold_per_queue( + fun (QPid, MsgIds, L) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + [{QPid, length(MsgIds)} | L] + end, [], Acked), + maybe_incr_stats(QIncs, ack, State), + ok = notify_limiter(State#ch.limiter_pid, Acked), + State. -internal_rollback(State = #ch{transaction_id = TxnKey, - tx_participants = Participants, - uncommitted_ack_q = UAQ, - unacked_message_q = UAMQ}) -> - ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n", - [self(), - queue:len(UAQ), - queue:len(UAMQ)]), - ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey, self()), - NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}). - -rollback_and_notify(State = #ch{state = closing}) -> +new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), + uncommitted_ack_q = queue:new()}. + +notify_queues(State = #ch{state = closing}) -> {ok, State}; -rollback_and_notify(State = #ch{transaction_id = none}) -> - {notify_queues(State), State#ch{state = closing}}; -rollback_and_notify(State) -> - State1 = internal_rollback(State), - {notify_queues(State1), State1#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers}) -> + {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), + State#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1319,9 +1285,6 @@ start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> ok = limit_queues(LPid, State), LPid. -notify_queues(#ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). - unlimit_queues(State) -> ok = limit_queues(undefined, State), undefined. @@ -1348,6 +1311,18 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. +deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ + exchange_name = XName}, + msg_seq_no = MsgSeqNo}, + QNames}, State) -> + {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), + State1 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State), + maybe_incr_stats([{XName, 1} | + [{{QPid, XName}, 1} || + QPid <- DeliveredQPids]], publish, State1), + State1. + process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], @@ -1355,8 +1330,7 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_not_delivered, State), + maybe_incr_stats([{XName, 1}], return_not_delivered, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1386,20 +1360,25 @@ lock_message(false, _MsgStruct, State) -> send_nacks([], State) -> State; -send_nacks(MXs, State) -> +send_nacks(MXs, State = #ch{tx_status = none}) -> MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], coalesce_and_send(MsgSeqNos, fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} - end, State). + end, State); +send_nacks(_, State) -> + maybe_complete_tx(State#ch{tx_status = failed}). -send_confirms(State = #ch{confirmed = C}) -> +send_confirms(State = #ch{tx_status = none, confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), MsgSeqNo end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}). + send_confirms(MsgSeqNos, State #ch{confirmed = []}); +send_confirms(State) -> + maybe_complete_tx(State). + send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1429,6 +1408,25 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. +maybe_complete_tx(State = #ch{tx_status = in_progress}) -> + State; +maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> + case gb_trees:is_empty(UMQ) of + false -> State; + true -> complete_tx(State#ch{confirmed = []}) + end. + +complete_tx(State = #ch{tx_status = committing}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + State#ch{tx_status = in_progress}; +complete_tx(State = #ch{tx_status = failed}) -> + {noreply, State1} = send_exception( + rabbit_misc:amqp_error( + precondition_failed, "partial tx completion", [], + 'tx.commit'), + State), + State1#ch{tx_status = in_progress}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); @@ -1436,17 +1434,18 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(transactional, #ch{tx_status = TE}) -> TE =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> gb_trees:size(UMQ); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = UAQ}) -> - queue:len(UAMQ) + queue:len(UAQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> - queue:len(UAQ); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> + queue:len(UAMQ); +i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> + queue:len(TMQ); +i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> + queue:len(TAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> @@ -1454,6 +1453,11 @@ i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). +maybe_incr_redeliver_stats(true, QPid, State) -> + maybe_incr_stats([{QPid, 1}], redeliver, State); +maybe_incr_redeliver_stats(_, _, _) -> + ok. + maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; @@ -1488,10 +1492,10 @@ update_measures(Type, QX, Inc, Measure) -> put({Type, QX}, orddict:store(Measure, Cur + Inc, Measures)). -internal_emit_stats(State) -> - internal_emit_stats(State, []). +emit_stats(State) -> + emit_stats(State, []). -internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> +emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 355ac549..e8afed0c 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -17,7 +17,7 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1]). +-export([start/0, stop/0, action/5, diagnostics/1, log_action/3]). -define(RPC_TIMEOUT, infinity). -define(WAIT_FOR_VM_ATTEMPTS, 5). @@ -51,6 +51,7 @@ -> 'ok'). -spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). +-spec(log_action/3 :: (node(), string(), [term()]) -> ok). -endif. @@ -73,6 +74,7 @@ start() -> Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), + rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -235,17 +237,17 @@ action(clear_password, Node, Args = [Username], _Opts, Inform) -> Inform("Clearing password for user ~p", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); -action(set_admin, Node, [Username], _Opts, Inform) -> - Inform("Setting administrative status for user ~p", [Username]), - call(Node, {rabbit_auth_backend_internal, set_admin, [Username]}); - -action(clear_admin, Node, [Username], _Opts, Inform) -> - Inform("Clearing administrative status for user ~p", [Username]), - call(Node, {rabbit_auth_backend_internal, clear_admin, [Username]}); +action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> + Tags = [list_to_atom(T) || T <- TagsStr], + Inform("Setting tags for user ~p to ~p", [Username, Tags]), + rpc_call(Node, rabbit_auth_backend_internal, set_tags, + [list_to_binary(Username), Tags]); action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), - display_list(call(Node, {rabbit_auth_backend_internal, list_users, []})); + display_info_list( + call(Node, {rabbit_auth_backend_internal, list_users, []}), + rabbit_auth_backend_internal:user_info_keys()); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), @@ -301,7 +303,7 @@ action(list_connections, Node, Args, _Opts, Inform) -> action(list_channels, Node, Args, _Opts, Inform) -> Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); @@ -422,17 +424,6 @@ format_info_item([T | _] = Value) format_info_item(Value) -> io_lib:format("~w", [Value]). -display_list(L) when is_list(L) -> - lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [escape(I)]); - (I) when is_tuple(I) -> - display_row([escape(V) - || V <- tuple_to_list(I)]) - end, - lists:sort(L)), - ok; -display_list(Other) -> Other. - display_call_result(Node, MFA) -> case call(Node, MFA) of {badrpc, _} = Res -> throw(Res); @@ -485,3 +476,22 @@ quit(Status) -> {unix, _} -> halt(Status); {win32, _} -> init:stop(Status) end. + +log_action(Node, Command, Args) -> + rabbit_misc:with_local_io( + fun () -> + error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n", + [Node, Command, + format_args(mask_args(Command, Args))]) + end). + +%% Mask passwords and other sensitive info before logging. +mask_args("add_user", [Name, _Password | Args]) -> + [Name, "****" | Args]; +mask_args("change_password", [Name, _Password | Args]) -> + [Name, "****" | Args]; +mask_args(_, Args) -> + Args. + +format_args(Args) -> + string:join([io_lib:format("~p", [A]) || A <- Args], " "). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 3fb0817a..93aad9e3 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -71,7 +71,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(LogExch, RoutingKey, false, false, none, + rabbit_basic:publish(LogExch, RoutingKey, false, false, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 9ed532db..bb765566 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,14 +19,14 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). -export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- --record(state, {level, timer}). +-record(state, {level, interval, timer}). %%---------------------------------------------------------------------------- @@ -49,6 +49,7 @@ -opaque(state() :: #state { level :: level(), + interval :: integer(), timer :: atom() }). @@ -56,7 +57,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()). -spec(stop_stats_timer/1 :: (state()) -> state()). -spec(reset_stats_timer/1 :: (state()) -> state()). -spec(stats_level/1 :: (state()) -> level()). @@ -79,7 +80,7 @@ start_link() -> %% if_enabled(internal_emit_stats) - so we immediately send something %% %% On wakeup: -%% ensure_stats_timer(Timer, emit_stats) +%% ensure_stats_timer(Timer, Pid, emit_stats) %% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) %% %% emit_stats: @@ -95,15 +96,16 @@ start_link() -> init_stats_timer() -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), - #state{level = StatsLevel, timer = undefined}. + {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), + #state{level = StatsLevel, interval = Interval, timer = undefined}. -ensure_stats_timer(State = #state{level = none}, _Fun) -> +ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) -> State; -ensure_stats_timer(State = #state{timer = undefined}, Fun) -> - {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [Fun, []]), +ensure_stats_timer(State = #state{interval = Interval, + timer = undefined}, Pid, Msg) -> + TRef = erlang:send_after(Interval, Pid, Msg), State#state{timer = TRef}; -ensure_stats_timer(State, _Fun) -> +ensure_stats_timer(State, _Pid, _Msg) -> State. stop_stats_timer(State = #state{level = none}) -> @@ -111,7 +113,7 @@ stop_stats_timer(State = #state{level = none}) -> stop_stats_timer(State = #state{timer = undefined}) -> State; stop_stats_timer(State = #state{timer = TRef}) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State#state{timer = undefined}. reset_stats_timer(State) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index cab1b99f..afa48355 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -20,9 +20,9 @@ -export([recover/0, callback/3, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, + lookup/1, lookup_or_die/1, list/1, update_scratch/2, info_keys/0, info/1, info/2, info_all/1, info_all/2, - publish/2, delete/2]). + route/2, delete/2]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/1, serial/1, peek_serial/1]). @@ -58,6 +58,7 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). +-spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -66,8 +67,8 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> [rabbit_amqqueue:name()]). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | @@ -199,6 +200,23 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +update_scratch(Name, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X = #exchange{durable = Durable, scratch = Scratch}] -> + X1 = X#exchange{scratch = Fun(Scratch)}, + ok = mnesia:write(rabbit_exchange, X1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_exchange, + X1, write); + _ -> ok + end; + [] -> + ok + end + end). + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -224,21 +242,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X = #exchange{name = XName}, Delivery) -> - rabbit_router:deliver( - route(Delivery, {queue:from_list([X]), XName, []}), - Delivery). +route(X = #exchange{name = XName}, Delivery) -> + route1(Delivery, {queue:from_list([X]), XName, []}). -route(Delivery, {WorkList, SeenXs, QNames}) -> +route1(Delivery, {WorkList, SeenXs, QNames}) -> case queue:out(WorkList) of {empty, _WorkList} -> lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> DstNames = process_alternate( X, ((type_to_module(Type)):route(X, Delivery))), - route(Delivery, - lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, - DstNames)) + route1(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. process_alternate(#exchange{name = XName, arguments = Args}, []) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 1b72dd76..8f9ab032 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -49,7 +49,7 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, - queues = dict:new(), % QPid -> {MonitorRef, Notify} + queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -120,9 +120,9 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired}, _From, +handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> - {reply, false, State}; + {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of @@ -196,31 +196,30 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> - case dict:is_key(QPid, Queues) of + case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> - case dict:find(QPid, Queues) of - {ok, {MRef, _}} -> - true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), - State#lim{queues = dict:erase(QPid, Queues)}; - error -> State + case orddict:find(QPid, Queues) of + {ok, {MRef, _}} -> true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = orddict:erase(QPid, Queues)}; + error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), + orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), case length(QList) of 0 -> ok; L -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl new file mode 100644 index 00000000..f6664a27 --- /dev/null +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -0,0 +1,395 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_coordinator). + +-export([start_link/3, get_gm/1, ensure_monitoring/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm, + monitors, + death_fun + }). + +-define(ONE_SECOND, 1000). + +-ifdef(use_specs). + +-spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun()) -> + rabbit_types:ok_pid_or_error()). +-spec(get_gm/1 :: (pid()) -> pid()). +-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% +%% Mirror Queues +%% +%% A queue with mirrors consists of the following: +%% +%% #amqqueue{ pid, mirror_pids } +%% | | +%% +----------+ +-------+--------------+-----------...etc... +%% | | | +%% V V V +%% amqqueue_process---+ slave-----+ slave-----+ ...etc... +%% | BQ = master----+ | | BQ = vq | | BQ = vq | +%% | | BQ = vq | | +-+-------+ +-+-------+ +%% | +-+-------+ | | | +%% +-++-----|---------+ | | (some details elided) +%% || | | | +%% || coordinator-+ | | +%% || +-+---------+ | | +%% || | | | +%% || gm-+ -- -- -- -- gm-+- -- -- -- gm-+- -- --...etc... +%% || +--+ +--+ +--+ +%% || +%% consumers +%% +%% The master is merely an implementation of bq, and thus is invoked +%% through the normal bq interface by the amqqueue_process. The slaves +%% meanwhile are processes in their own right (as is the +%% coordinator). The coordinator and all slaves belong to the same gm +%% group. Every member of a gm group receives messages sent to the gm +%% group. Because the master is the bq of amqqueue_process, it doesn't +%% have sole control over its mailbox, and as a result, the master +%% itself cannot be passed messages directly (well, it could by via +%% the amqqueue:run_backing_queue callback but that would induce +%% additional unnecessary loading on the master queue process), yet it +%% needs to react to gm events, such as the death of slaves. Thus the +%% master creates the coordinator, and it is the coordinator that is +%% the gm callback module and event handler for the master. +%% +%% Consumers are only attached to the master. Thus the master is +%% responsible for informing all slaves when messages are fetched from +%% the bq, when they're acked, and when they're requeued. +%% +%% The basic goal is to ensure that all slaves performs actions on +%% their bqs in the same order as the master. Thus the master +%% intercepts all events going to its bq, and suitably broadcasts +%% these events on the gm. The slaves thus receive two streams of +%% events: one stream is via the gm, and one stream is from channels +%% directly. Whilst the stream via gm is guaranteed to be consistently +%% seen by all slaves, the same is not true of the stream via +%% channels. For example, in the event of an unexpected death of a +%% channel during a publish, only some of the mirrors may receive that +%% publish. As a result of this problem, the messages broadcast over +%% the gm contain published content, and thus slaves can operate +%% successfully on messages that they only receive via the gm. The key +%% purpose of also sending messages directly from the channels to the +%% slaves is that without this, in the event of the death of the +%% master, messages could be lost until a suitable slave is promoted. +%% +%% However, that is not the only reason. For example, if confirms are +%% in use, then there is no guarantee that every slave will see the +%% delivery with the same msg_seq_no. As a result, the slaves have to +%% wait until they've seen both the publish via gm, and the publish +%% via the channel before they have enough information to be able to +%% perform the publish to their own bq, and subsequently issue the +%% confirm, if necessary. Either form of publish can arrive first, and +%% a slave can be upgraded to the master at any point during this +%% process. Confirms continue to be issued correctly, however. +%% +%% Because the slave is a full process, it impersonates parts of the +%% amqqueue API. However, it does not need to implement all parts: for +%% example, no ack or consumer-related message can arrive directly at +%% a slave from a channel: it is only publishes that pass both +%% directly to the slaves and go via gm. +%% +%% Slaves can be added dynamically. When this occurs, there is no +%% attempt made to sync the current contents of the master with the +%% new slave, thus the slave will start empty, regardless of the state +%% of the master. Thus the slave needs to be able to detect and ignore +%% operations which are for messages it has not received: because of +%% the strict FIFO nature of queues in general, this is +%% straightforward - all new publishes that the new slave receives via +%% gm should be processed as normal, but fetches which are for +%% messages the slave has never seen should be ignored. Similarly, +%% acks for messages the slave never fetched should be +%% ignored. Eventually, as the master is consumed from, the messages +%% at the head of the queue which were there before the slave joined +%% will disappear, and the slave will become fully synced with the +%% state of the master. The detection of the sync-status of a slave is +%% done entirely based on length: if the slave and the master both +%% agree on the length of the queue after the fetch of the head of the +%% queue, then the queues must be in sync. The only other possibility +%% is that the slave's queue is shorter, and thus the fetch should be +%% ignored. +%% +%% Because acktags are issued by the bq independently, and because +%% there is no requirement for the master and all slaves to use the +%% same bq, all references to msgs going over gm is by msg_id. Thus +%% upon acking, the master must convert the acktags back to msg_ids +%% (which happens to be what bq:ack returns), then sends the msg_ids +%% over gm, the slaves must convert the msg_ids to acktags (a mapping +%% the slaves themselves must maintain). +%% +%% When the master dies, a slave gets promoted. This will be the +%% eldest slave, and thus the hope is that that slave is most likely +%% to be sync'd with the master. The design of gm is that the +%% notification of the death of the master will only appear once all +%% messages in-flight from the master have been fully delivered to all +%% members of the gm group. Thus at this point, the slave that gets +%% promoted cannot broadcast different events in a different order +%% than the master for the same msgs: there is no possibility for the +%% same msg to be processed by the old master and the new master - if +%% it was processed by the old master then it will have been processed +%% by the slave before the slave was promoted, and vice versa. +%% +%% Upon promotion, all msgs pending acks are requeued as normal, the +%% slave constructs state suitable for use in the master module, and +%% then dynamically changes into an amqqueue_process with the master +%% as the bq, and the slave's bq as the master's bq. Thus the very +%% same process that was the slave is now a full amqqueue_process. +%% +%% It is important that we avoid memory leaks due to the death of +%% senders (i.e. channels) and partial publications. A sender +%% publishing a message may fail mid way through the publish and thus +%% only some of the mirrors will receive the message. We need the +%% mirrors to be able to detect this and tidy up as necessary to avoid +%% leaks. If we just had the master monitoring all senders then we +%% would have the possibility that a sender appears and only sends the +%% message to a few of the slaves before dying. Those slaves would +%% then hold on to the message, assuming they'll receive some +%% instruction eventually from the master. Thus we have both slaves +%% and the master monitor all senders they become aware of. But there +%% is a race: if the slave receives a DOWN of a sender, how does it +%% know whether or not the master is going to send it instructions +%% regarding those messages? +%% +%% Whilst the master monitors senders, it can't access its mailbox +%% directly, so it delegates monitoring to the coordinator. When the +%% coordinator receives a DOWN message from a sender, it informs the +%% master via a callback. This allows the master to do any tidying +%% necessary, but more importantly allows the master to broadcast a +%% sender_death message to all the slaves, saying the sender has +%% died. Once the slaves receive the sender_death message, they know +%% that they're not going to receive any more instructions from the gm +%% regarding that sender, thus they throw away any publications from +%% the sender pending publication instructions. However, it is +%% possible that the coordinator receives the DOWN and communicates +%% that to the master before the master has finished receiving and +%% processing publishes from the sender. This turns out not to be a +%% problem: the sender has actually died, and so will not need to +%% receive confirms or other feedback, and should further messages be +%% "received" from the sender, the master will ask the coordinator to +%% set up a new monitor, and will continue to process the messages +%% normally. Slaves may thus receive publishes via gm from previously +%% declared "dead" senders, but again, this is fine: should the slave +%% have just thrown out the message it had received directly from the +%% sender (due to receiving a sender_death message via gm), it will be +%% able to cope with the publication purely from the master via gm. +%% +%% When a slave receives a DOWN message for a sender, if it has not +%% received the sender_death message from the master via gm already, +%% then it will wait 20 seconds before broadcasting a request for +%% confirmation from the master that the sender really has died. +%% Should a sender have only sent a publish to slaves, this allows +%% slaves to inform the master of the previous existence of the +%% sender. The master will thus monitor the sender, receive the DOWN, +%% and subsequently broadcast the sender_death message, allowing the +%% slaves to tidy up. This process can repeat for the same sender: +%% consider one slave receives the publication, then the DOWN, then +%% asks for confirmation of death, then the master broadcasts the +%% sender_death message. Only then does another slave receive the +%% publication and thus set up its monitoring. Eventually that slave +%% too will receive the DOWN, ask for confirmation and the master will +%% monitor the sender again, receive another DOWN, and send out +%% another sender_death message. Given the 20 second delay before +%% requesting death confirmation, this is highly unlikely, but it is a +%% possibility. +%% +%% When the 20 second timer expires, the slave first checks to see +%% whether it still needs confirmation of the death before requesting +%% it. This prevents unnecessary traffic on gm as it allows one +%% broadcast of the sender_death message to satisfy many slaves. +%% +%% If we consider the promotion of a slave at this point, we have two +%% possibilities: that of the slave that has received the DOWN and is +%% thus waiting for confirmation from the master that the sender +%% really is down; and that of the slave that has not received the +%% DOWN. In the first case, in the act of promotion to master, the new +%% master will monitor again the dead sender, and after it has +%% finished promoting itself, it should find another DOWN waiting, +%% which it will then broadcast. This will allow slaves to tidy up as +%% normal. In the second case, we have the possibility that +%% confirmation-of-sender-death request has been broadcast, but that +%% it was broadcast before the master failed, and that the slave being +%% promoted does not know anything about that sender, and so will not +%% monitor it on promotion. Thus a slave that broadcasts such a +%% request, at the point of broadcasting it, recurses, setting another +%% 20 second timer. As before, on expiry of the timer, the slaves +%% checks to see whether it still has not received a sender_death +%% message for the dead sender, and if not, broadcasts a death +%% confirmation request. Thus this ensures that even when a master +%% dies and the new slave has no knowledge of the dead sender, it will +%% eventually receive a death confirmation request, shall monitor the +%% dead sender, receive the DOWN and broadcast the sender_death +%% message. +%% +%% The preceding commentary deals with the possibility of slaves +%% receiving publications from senders which the master does not, and +%% the need to prevent memory leaks in such scenarios. The inverse is +%% also possible: a partial publication may cause only the master to +%% receive a publication. It will then publish the message via gm. The +%% slaves will receive it via gm, will publish it to their BQ and will +%% set up monitoring on the sender. They will then receive the DOWN +%% message and the master will eventually publish the corresponding +%% sender_death message. The slave will then be able to tidy up its +%% state as normal. +%% +%% Recovery of mirrored queues is straightforward: as nodes die, the +%% remaining nodes record this, and eventually a situation is reached +%% in which only one node is alive, which is the master. This is the +%% only node which, upon recovery, will resurrect a mirrored queue: +%% nodes which die and then rejoin as a slave will start off empty as +%% if they have no mirrored content at all. This is not surprising: to +%% achieve anything more sophisticated would require the master and +%% recovering slave to be able to check to see whether they agree on +%% the last seen state of the queue: checking length alone is not +%% sufficient in this case. +%% +%% For more documentation see the comments in bug 23554. +%% +%%---------------------------------------------------------------------------- + +start_link(Queue, GM, DeathFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). + +get_gm(CPid) -> + gen_server2:call(CPid, get_gm, infinity). + +ensure_monitoring(CPid, Pids) -> + gen_server2:cast(CPid, {ensure_monitoring, Pids}). + +%% --------------------------------------------------------------------------- +%% gen_server +%% --------------------------------------------------------------------------- + +init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> + GM1 = case GM of + undefined -> + {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM2, _Members} -> + ok + end, + GM2; + _ -> + true = link(GM), + GM + end, + {ok, _TRef} = + timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(get_gm, _From, State = #state { gm = GM }) -> + reply(GM, State). + +handle_cast({gm_deaths, Deaths}, + State = #state { q = #amqqueue { name = QueueName } }) -> + rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node() -> + noreply(State); + {error, not_found} -> + {stop, normal, State} + end; + +handle_cast({ensure_monitoring, Pids}, + State = #state { monitors = Monitors }) -> + Monitors1 = + lists:foldl(fun (Pid, MonitorsN) -> + case dict:is_key(Pid, MonitorsN) of + true -> MonitorsN; + false -> MRef = erlang:monitor(process, Pid), + dict:store(Pid, MRef, MonitorsN) + end + end, Monitors, Pids), + noreply(State #state { monitors = Monitors1 }). + +handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, + State = #state { monitors = Monitors, + death_fun = Fun }) -> + noreply( + case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = Fun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, #state{}) -> + %% gen_server case + ok; +terminate([_CPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([CPid], Members) -> + CPid ! {joined, self(), Members}, + ok. + +members_changed([_CPid], _Births, []) -> + ok; +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). + +handle_msg([_CPid], _From, heartbeat) -> + ok; +handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> + ok = gen_server2:cast(CPid, Msg); +handle_msg([_CPid], _From, _Msg) -> + ok. + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl new file mode 100644 index 00000000..532911f2 --- /dev/null +++ b/src/rabbit_mirror_queue_master.erl @@ -0,0 +1,390 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_master). + +-export([init/3, terminate/2, delete_and_terminate/2, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + set_ram_duration_target/2, ram_duration/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, + status/1, invoke/3, is_duplicate/2, discard/3]). + +-export([start/1, stop/0]). + +-export([promote_backing_queue_state/6, sender_death_fun/0]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(state, { gm, + coordinator, + backing_queue, + backing_queue_state, + set_delivered, + seen_status, + confirmed, + ack_msg_id, + known_senders + }). + +-ifdef(use_specs). + +-export_type([death_fun/0]). + +-type(death_fun() :: fun ((pid()) -> 'ok')). +-type(master_state() :: #state { gm :: pid(), + coordinator :: pid(), + backing_queue :: atom(), + backing_queue_state :: any(), + set_delivered :: non_neg_integer(), + seen_status :: dict(), + confirmed :: [rabbit_guid:guid()], + ack_msg_id :: dict(), + known_senders :: set() + }). + +-spec(promote_backing_queue_state/6 :: + (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(sender_death_fun/0 :: () -> death_fun()). + +-endif. + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator + +%% --------------------------------------------------------------------------- +%% Backing queue +%% --------------------------------------------------------------------------- + +start(_DurableQueues) -> + %% This will never get called as this module will never be + %% installed as the default BQ implementation. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +stop() -> + %% Same as start/1. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, + AsyncCallback) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun()), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + MNodes1 = + (case MNodes of + all -> rabbit_mnesia:all_clustered_nodes(); + undefined -> []; + _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes] + end) -- [node()], + [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = 0, + seen_status = dict:new(), + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:new() }. + +terminate({shutdown, dropped} = Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination - this node has been explicitly + %% dropped. Normally, non-durable queues would be tidied up on + %% startup, but there's a possibility that we will be added back + %% in without this node being restarted. Thus we must do the full + %% blown delete_and_terminate now, but only locally: we do not + %% broadcast delete_and_terminate. + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }; +terminate(Reason, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination. The queue is going down but + %% shouldn't be deleted. Most likely safe shutdown of this + %% node. Thus just let some other slave take over. + State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. + +delete_and_terminate(Reason, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +purge(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_length, 0}), + {Count, BQS1} = BQ:purge(BQS), + {Count, State #state { backing_queue_state = BQS1, + set_delivered = 0 }}. + +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). + +publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, + ChPid, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + %% Must use confirmed_broadcast here in order to guarantee that + %% all slaves are forced to interpret this publish_delivered at + %% the same point, especially if we die and a slave is promoted. + ok = gm:confirmed_broadcast( + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {AckTag, + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 })}. + +dropwhile(Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> + Len = BQ:len(BQS), + BQS1 = BQ:dropwhile(Fun, BQS), + Dropped = Len - BQ:len(BQS1), + SetDelivered1 = lists:max([0, SetDelivered - Dropped]), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. + +drain_confirmed(State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS, + confirmed = Confirmed }) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + %% We will never see 'discarded' here + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} + end + end, {[], SS}, MsgIds), + {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1, + confirmed = [] }}. + +fetch(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered, + ack_msg_id = AM }) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, + Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + SetDelivered1 = lists:max([0, SetDelivered - 1]), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1, + ack_msg_id = AM1 }} + end. + +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + case MsgIds of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + end, + {MsgIds, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. + +requeue(AckTags, MsgPropsFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:len(BQS). + +is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:is_empty(BQS). + +set_ram_duration_target(Target, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = + BQ:set_ram_duration_target(Target, BQS) }. + +ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:ram_duration(BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + +needs_timeout(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:needs_timeout(BQS). + +timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:timeout(BQS) }. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. + +status(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:status(BQS). + +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State); +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +is_duplicate(Message = #basic_message { id = MsgId }, + State = #state { seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + confirmed = Confirmed }) -> + %% Here, we need to deal with the possibility that we're about to + %% receive a message that we've already seen when we were a slave + %% (we received it via gm). Thus if we do receive such message now + %% via the channel, there may be a confirm waiting to issue for + %% it. + + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, SS) of + error -> + %% We permit the underlying BQ to have a peek at it, but + %% only if we ourselves are not filtering out the msg. + {Result, BQS1} = BQ:is_duplicate(Message, BQS), + {Result, State #state { backing_queue_state = BQS1 }}; + {ok, published} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have, in + %% its msg_id_to_channel mapping, the entry for dealing + %% with the confirm when that comes back in (it's added + %% immediately after calling is_duplicate). The msg is + %% invalid. We will not see this again, nor will we be + %% further involved in confirming this message, so erase. + {published, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, confirmed} -> + %% It got published when we were a slave via gm, and + %% confirmed some time after that (maybe even after + %% promotion), but before we received the publish from the + %% channel, so couldn't previously know what the + %% msg_seq_no was (and thus confirm as a slave). So we + %% need to confirm now. As above, amqqueue_process will + %% have the entry for the msg_id_to_channel mapping added + %% immediately after calling is_duplicate/2. + {published, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }}; + {ok, discarded} -> + %% Don't erase from SS here because discard/2 is about to + %% be called and we need to be able to detect this case + {discarded, State} + end. + +discard(Msg = #basic_message { id = MsgId }, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, Msg}), + State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }; + {ok, discarded} -> + State + end. + +%% --------------------------------------------------------------------------- +%% Other exported functions +%% --------------------------------------------------------------------------- + +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus, + confirmed = [], + ack_msg_id = dict:new(), + known_senders = sets:from_list(KS) }. + +sender_death_fun() -> + Self = self(), + fun (DeadPid) -> + rabbit_amqqueue:run_backing_queue( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> + ok = gm:broadcast(GM, {sender_death, DeadPid}), + KS1 = sets:del_element(DeadPid, KS), + State #state { known_senders = KS1 } + end) + end. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +maybe_store_acktag(undefined, _MsgId, AM) -> + AM; +maybe_store_acktag(AckTag, MsgId, AM) -> + dict:store(AckTag, MsgId, AM). + +ensure_monitoring(ChPid, State = #state { coordinator = CPid, + known_senders = KS }) -> + case sets:is_element(ChPid, KS) of + true -> State; + false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, [ChPid]), + State #state { known_senders = sets:add_element(ChPid, KS) } + end. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl new file mode 100644 index 00000000..6a9f733e --- /dev/null +++ b/src/rabbit_mirror_queue_misc.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_misc). + +-export([remove_from_queue/2, on_node_up/0, + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). + +-include("rabbit.hrl"). + +%% If the dead pids include the queue pid (i.e. the master has died) +%% then only remove that if we are about to be promoted. Otherwise we +%% can have the situation where a slave updates the mnesia record for +%% a queue, promoting another slave before that slave realises it has +%% become the new master, which is bad because it could then mean the +%% slave (now master) receives messages it's not ready for (for +%% example, new consumers). +remove_from_queue(QueueName, DeadPids) -> + DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + rabbit_misc:execute_mnesia_transaction( + fun () -> + %% Someone else could have deleted the queue before we + %% get here. + case mnesia:read({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [Q = #amqqueue { pid = QPid, + slave_pids = SPids }] -> + [QPid1 | SPids1] = + [Pid || Pid <- [QPid | SPids], + not lists:member(node(Pid), DeadNodes)], + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + ok; + _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. + Q1 = Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }, + ok = rabbit_amqqueue:store_queue(Q1); + _ -> + %% Master has changed, and we're not it, + %% so leave alone to allow the promoted + %% slave to find it and make its + %% promotion atomic. + ok + end, + {ok, QPid1} + end + end). + +on_node_up() -> + Qs = + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl( + fun (#amqqueue { mirror_nodes = undefined }, QsN) -> + QsN; + (#amqqueue { name = QName, + mirror_nodes = all }, QsN) -> + [QName | QsN]; + (#amqqueue { name = QName, + mirror_nodes = MNodes }, QsN) -> + case lists:member(node(), MNodes) of + true -> [QName | QsN]; + false -> QsN + end + end, [], rabbit_queue) + end), + [add_mirror(Q, node()) || Q <- Qs], + ok. + +drop_mirror(VHostPath, QueueName, MirrorNode) -> + drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +drop_mirror(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + rabbit_log:info( + "Dropping queue mirror on node ~p for ~s~n", + [MirrorNode, rabbit_misc:rs(Name)]), + exit(Pid, {shutdown, dropped}), + ok + end + end). + +add_mirror(VHostPath, QueueName, MirrorNode) -> + add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +add_mirror(Queue, MirrorNode) -> + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> Result = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info( + "Adding mirror of queue ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + case Result of + {ok, _Pid} -> ok; + _ -> Result + end; + [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + end + end). + +if_mirrored_queue(Queue, Fun) -> + rabbit_amqqueue:with( + Queue, fun (#amqqueue { arguments = Args } = Q) -> + case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of + undefined -> ok; + _ -> Fun(Q) + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 00000000..c918f388 --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,851 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator +%% +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2, prioritise_info/2]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(DEATH_TIMEOUT, 20000). %% 20 seconds + +-record(state, { q, + gm, + master_pid, + backing_queue, + backing_queue_state, + sync_timer_ref, + rate_timer_ref, + + sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + msg_id_ack, %% :: MsgId -> AckTag + ack_num, + + msg_id_status, + known_senders + }). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + {ok, MPid} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + %% ASSERTION + [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], + MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {ok, QPid} + end), + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + {ok, #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + + %% It is safe to reply 'false' here even if a) we've not seen the + %% msg via gm, or b) the master dies before we receive the msg via + %% gm. In the case of (a), we will eventually receive the msg via + %% gm, and it's only the master's result to the channel that is + %% important. In the case of (b), if the master does die and we do + %% get promoted then at that point we have no consumers, thus + %% 'false' is precisely the correct answer. However, we must be + %% careful to _not_ enqueue the message in this case. + + %% Note this is distinct from the case where we receive the msg + %% via gm first, then we're promoted to master, and only then do + %% we receive the msg from the channel. + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, false, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_pid = MPid }) -> + rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + rabbit_misc:pid_to_string(self()), + [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + {ok, Pid} -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end. + +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }). + +handle_info(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }); + +handle_info(sync_timeout, State) -> + noreply(backing_queue_timeout( + State #state { sync_timer_ref = undefined })); + +handle_info(timeout, State) -> + noreply(backing_queue_timeout(State)); + +handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, + State = #state { gm = GM, master_pid = MPid }) -> + ok = gm:broadcast(GM, {process_death, MPid}), + noreply(State); + +handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> + noreply(local_sender_death(ChPid, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate({shutdown, dropped} = R, #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% See rabbit_mirror_queue_master:terminate/2 + BQ:delete_and_terminate(R, BQS); +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], [], dict:new()), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 + end. + +prioritise_info(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + sync_timeout -> 6; + _ -> 0 + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + inform_deaths(SPid, Deaths). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> + %% This is only of value to the master + ok; +handle_msg([SPid], _From, {process_death, Pid}) -> + inform_deaths(SPid, [Pid]); +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +inform_deaths(SPid, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end). + +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments in + %% confirm_sender_death/1 + Fun(?MODULE, State); +run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> + {MS1, CMs} = + lists:foldl( + fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see 'discarded' here + case dict:find(MsgId, MSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(MsgId, MSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)}; + {ok, {confirmed, _ChPid}} -> + %% It's already been confirmed. This is + %% probably it's been both sync'd to disk + %% and then delivered and ack'd before we've + %% seen the publish from the + %% channel. Nothing to do here. + Acc + end + end, {MS, gb_trees:empty()}, MsgIds), + [ok = rabbit_channel:confirm(ChPid, MsgSeqNos) + || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)], + State #state { msg_id_status = MS1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + msg_id_ack = MA, + msg_id_status = MS, + known_senders = KS }) -> + rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", + [rabbit_misc:rs(Q #amqqueue.name), + rabbit_misc:pid_to_string(self())]), + Q1 = Q #amqqueue { pid = self() }, + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + + %% Everything that we're monitoring, we need to ensure our new + %% coordinator is monitoring. + + MonitoringPids = [begin true = erlang:demonitor(MRef), + Pid + end || {Pid, MRef} <- dict:to_list(KS)], + ok = rabbit_mirror_queue_coordinator:ensure_monitoring( + CPid, MonitoringPids), + + %% We find all the messages that we've received from channels but + %% not from gm, and if they're due to be enqueued on promotion + %% then we pass them to the + %% queue_process:init_with_backing_queue_state to be enqueued. + %% + %% We also have to requeue messages which are pending acks: the + %% consumers from the master queue have been lost and so these + %% messages need requeuing. They might also be pending + %% confirmation, and indeed they might also be pending arrival of + %% the publication from the channel itself, if we received both + %% the publication and the fetch via gm first! Requeuing doesn't + %% affect confirmations: if the message was previously pending a + %% confirmation then it still will be, under the same msg_id. So + %% as a master, we need to be prepared to filter out the + %% publication of said messages from the channel (is_duplicate + %% (thus such requeued messages must remain in the msg_id_status + %% (MS) which becomes seen_status (SS) in the master)). + %% + %% Then there are messages we already have in the queue, which are + %% not currently pending acknowledgement: + %% 1. Messages we've only received via gm: + %% Filter out subsequent publication from channel through + %% validate_message. Might have to issue confirms then or + %% later, thus queue_process state will have to know that + %% there's a pending confirm. + %% 2. Messages received via both gm and channel: + %% Queue will have to deal with issuing confirms if necessary. + %% + %% MS contains the following three entry types: + %% + %% a) {published, ChPid}: + %% published via gm only; pending arrival of publication from + %% channel, maybe pending confirm. + %% + %% b) {published, ChPid, MsgSeqNo}: + %% published via gm and channel; pending confirm. + %% + %% c) {confirmed, ChPid}: + %% published via gm only, and confirmed; pending publication + %% from channel. + %% + %% d) discarded + %% seen via gm only as discarded. Pending publication from + %% channel + %% + %% The forms a, c and d only, need to go to the master state + %% seen_status (SS). + %% + %% The form b only, needs to go through to the queue_process + %% state to form the msg_id_to_channel mapping (MTC). + %% + %% No messages that are enqueued from SQ at this point will have + %% entries in MS. + %% + %% Messages that are extracted from MA may have entries in MS, and + %% those messages are then requeued. However, as discussed above, + %% this does not affect MS, nor which bits go through to SS in + %% Master, or MTC in queue_process. + %% + %% Everything that's in MA gets requeued. Consequently the new + %% master should start with a fresh AM as there are no messages + %% pending acks. + + MSList = dict:to_list(MS), + SS = dict:from_list( + [E || E = {_MsgId, discarded} <- MSList] ++ + [{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- MSList, + Status =:= published orelse Status =:= confirmed]), + + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, SS, MonitoringPids), + + MTC = dict:from_list( + [{MsgId, {ChPid, MsgSeqNo}} || + {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], + AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], + Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), + {Delivery, true} <- queue:to_list(PubQ)], + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries, MTC), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. + +next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 })), + case BQ:needs_timeout(BQS1) of + false -> {stop_sync_timer(State1), hibernate}; + idle -> {stop_sync_timer(State1), 0 }; + timed -> {ensure_sync_timer(State1), 0 } + end. + +backing_queue_timeout(State = #state { backing_queue = BQ }) -> + run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). + +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), + State #state { sync_timer_ref = TRef }; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + erlang:cancel_timer(TRef), + State #state { sync_timer_ref = undefined }. + +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + erlang:cancel_timer(TRef), + State #state { rate_timer_ref = undefined }. + +ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> + case dict:is_key(ChPid, KS) of + true -> State; + false -> MRef = erlang:monitor(process, ChPid), + State #state { known_senders = dict:store(ChPid, MRef, KS) } + end. + +local_sender_death(ChPid, State = #state { known_senders = KS }) -> + ok = case dict:is_key(ChPid, KS) of + false -> ok; + true -> confirm_sender_death(ChPid) + end, + State. + +confirm_sender_death(Pid) -> + %% We have to deal with the possibility that we'll be promoted to + %% master before this thing gets run. Consequently we set the + %% module to rabbit_mirror_queue_master so that if we do become a + %% rabbit_amqqueue_process before then, sane things will happen. + Fun = + fun (?MODULE, State = #state { known_senders = KS, + gm = GM }) -> + %% We're running still as a slave + ok = case dict:is_key(Pid, KS) of + false -> ok; + true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), + confirm_sender_death(Pid) + end, + State; + (rabbit_mirror_queue_master, State) -> + %% We've become a master. State is now opaque to + %% us. When we became master, if Pid was still known + %% to us then we'd have set up monitoring of it then, + %% so this is now a noop. + State + end, + %% Note that we do not remove our knowledge of this ChPid until we + %% get the sender_death from GM. + {ok, _TRef} = timer:apply_after( + ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, + [self(), rabbit_mirror_queue_master, Fun]), + ok. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { id = MsgId }, + msg_seq_no = MsgSeqNo, + sender = ChPid }, + EnqueueOnPromotion, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + State1 = ensure_monitoring(ChPid, State), + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, MS) of + error -> + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + State1 #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { sender_queues = SQ1, + msg_id_status = dict:erase(MsgId, MS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State1) of + never -> + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; + eventually -> + State1 #state { + msg_id_status = + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end; + {ok, discarded} -> + %% We've already heard from GM that the msg is to be + %% discarded. We won't see this again. + SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 } + end. + +get_sender_queue(ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> {queue:new(), sets:new()}; + {ok, Val} -> Val + end. + +remove_from_pending_ch(MsgId, ChPid, SQ) -> + case dict:find(ChPid, SQ) of + error -> + SQ; + {ok, {MQ, PendingCh}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + end. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the msg_id confirmed until we can associate it + %% with a msg_seq_no. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ2} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid}, MS)}; + {{value, {Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We received the msg from the channel first. Thus we + %% need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> + {MQ2, PendingCh, MS}; + eventually -> + {MQ2, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + {MQ2, PendingCh, MS} + end; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. And the channel will not be + %% expecting any confirms from us. + {MQ, PendingCh, MS} + end, + + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, + + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State2 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State2 #state { backing_queue_state = BQS1 }) + end}; +process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + %% Many of the comments around the publish head above apply here + %% too. + State1 = ensure_monitoring(ChPid, State), + {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ1, PendingCh1, MS1} = + case queue:out(MQ) of + {empty, _MQ} -> + {MQ, sets:add_element(MsgId, PendingCh), + dict:store(MsgId, discarded, MS)}; + {{value, {#delivery { message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ2} -> + %% We've already seen it from the channel, we're not + %% going to see this again, so don't add it to MS + {MQ2, PendingCh, MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + %% The instruction was sent to us before we were + %% within the slave_pids within the #amqqueue{} + %% record. We'll never receive the message directly + %% from the channel. + {MQ, PendingCh, MS} + end, + SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + BQS1 = BQ:discard(Msg, ChPid, BQS), + {ok, State1 #state { sender_queues = SQ1, + msg_id_status = MS1, + backing_queue_state = BQS1 }}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, MsgId, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + [] = MsgIds1 -- MsgIds, %% ASSERTION + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {ok, case length(AckTags) =:= length(MsgIds) of + true -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }; + false -> + %% The only thing we can safely do is nuke out our BQ + %% and MA. The interaction between this and confirms + %% doesn't really bear thinking about... + {_Count, BQS1} = BQ:purge(BQS), + {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + State #state { msg_id_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction({sender_death, ChPid}, + State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + {ok, case dict:find(ChPid, KS) of + error -> + State; + {ok, MRef} -> + true = erlang:demonitor(MRef), + MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = dict:erase(ChPid, KS) } + end}; +process_instruction({delete_and_terminate, Reason}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(Reason, BQS), + {stop, State #state { backing_queue_state = undefined }}. + +msg_ids_to_acktags(MsgIds, MA) -> + {AckTags, MA1} = + lists:foldl( + fun (MsgId, {Acc, MAN}) -> + case dict:find(MsgId, MA) of + error -> {Acc, MAN}; + {ok, {_Num, AckTag}} -> {[AckTag | Acc], + dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), + {lists:reverse(AckTags), MA1}. + +ack_all(BQ, MA, BQS) -> + BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). + +maybe_store_ack(false, _MsgId, _AckTag, State) -> + State; +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, + ack_num = Num }) -> + State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), + ack_num = Num + 1 }. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl new file mode 100644 index 00000000..fc04ec79 --- /dev/null +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -0,0 +1,48 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave_sup). + +-behaviour(supervisor2). + +-export([start/0, start_link/0, start_child/2]). + +-export([init/1]). + +-include_lib("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +start() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_mirror_queue_slave_sup, + {rabbit_mirror_queue_slave_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), + ok. + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 10, 10}, + [{rabbit_mirror_queue_slave, + {rabbit_mirror_queue_slave, start_link, []}, + temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b6b97f6d..b98dbd46 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -42,7 +42,7 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). --export([format_stderr/2]). +-export([format_stderr/2, with_local_io/1]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -57,6 +57,7 @@ -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). +-export([format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -164,6 +165,7 @@ -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). +-spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). @@ -205,6 +207,7 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -endif. @@ -603,6 +606,17 @@ format_stderr(Fmt, Args) -> end, ok. +%% Execute Fun using the IO system of the local node (i.e. the node on +%% which the code is executing). +with_local_io(Fun) -> + GL = group_leader(), + group_leader(whereis(user), self()), + try + Fun() + after + group_leader(GL, self()) + end. + manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> case Do(App) of @@ -919,3 +933,24 @@ pget_or_die(K, P) -> undefined -> exit({error, key_missing, K}); V -> V end. + +format_message_queue(_Opt, MQ) -> + Len = priority_queue:len(MQ), + {Len, + case Len > 100 of + false -> priority_queue:to_list(MQ); + true -> {summary, + orddict:to_list( + lists:foldl( + fun ({P, V}, Counts) -> + orddict:update_counter( + {P, format_message_queue_entry(V)}, 1, Counts) + end, orddict:new(), priority_queue:to_list(MQ)))} + end}. + +format_message_queue_entry(V) when is_atom(V) -> + V; +format_message_queue_entry(V) when is_tuple(V) -> + list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); +format_message_queue_entry(_V) -> + '_'. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 568b9ce6..ab553a8b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -23,7 +23,8 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, - delete_previously_running_nodes/0, running_nodes_filename/0]). + delete_previously_running_nodes/0, running_nodes_filename/0, + is_disc_node/0]). -export([table_names/0]). @@ -65,6 +66,7 @@ -spec(read_previously_running_nodes/0 :: () -> [node()]). -spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). +-spec(is_disc_node/0 :: () -> boolean()). -endif. @@ -115,13 +117,47 @@ force_cluster(ClusterNodes) -> cluster(ClusterNodes, Force) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + + %% Wipe mnesia if we're changing type from disc to ram + case {is_disc_node(), should_be_disc_node(ClusterNodes)} of + {true, false} -> error_logger:warning_msg( + "changing node type; wiping mnesia...~n~n"), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema); + _ -> ok + end, + + %% Pre-emptively leave the cluster + %% + %% We're trying to handle the following two cases: + %% 1. We have a two-node cluster, where both nodes are disc nodes. + %% One node is re-clustered as a ram node. When it tries to + %% re-join the cluster, but before it has time to update its + %% tables definitions, the other node will order it to re-create + %% its disc tables. So, we need to leave the cluster before we + %% can join it again. + %% 2. We have a two-node cluster, where both nodes are disc nodes. + %% One node is forcefully reset (so, the other node thinks its + %% still a part of the cluster). The reset node is re-clustered + %% as a ram node. Same as above, we need to leave the cluster + %% before we can join it. But, since we don't know if we're in a + %% cluster or not, we just pre-emptively leave it before joining. + ProperClusterNodes = ClusterNodes -- [node()], + try + ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) + catch + {error, {no_running_cluster_nodes, _, _}} when Force -> + ok + end, + + %% Join the cluster + start_mnesia(), try ok = init_db(ClusterNodes, Force, fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after - mnesia:stop() + stop_mnesia() end, ok. @@ -158,10 +194,13 @@ nodes_of_type(Type) -> %% This function should return the nodes of a certain type (ram, %% disc or disc_only) in the current cluster. The type of nodes %% is determined when the cluster is initially configured. - %% Specifically, we check whether a certain table, which we know - %% will be written to disk on a disc node, is stored on disk or in - %% RAM. - mnesia:table_info(rabbit_durable_exchange, Type). + mnesia:table_info(schema, Type). + +%% The tables aren't supposed to be on disk on a ram node +table_definitions(disc) -> + table_definitions(); +table_definitions(ram) -> + [{Tab, copy_type_to_ram(TabDef)} || {Tab, TabDef} <- table_definitions()]. table_definitions() -> [{rabbit_user, @@ -218,8 +257,6 @@ table_definitions() -> {type, ordered_set}, {match, #topic_trie_binding{trie_binding = trie_binding_match(), _='_'}}]}, - %% Consider the implications to nodes_of_type/1 before altering - %% the next entry. {rabbit_durable_exchange, [{record_name, exchange}, {attributes, record_info(fields, exchange)}, @@ -241,7 +278,8 @@ table_definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), @@ -340,7 +378,11 @@ check_table_content(Tab, TabDef) -> end. check_tables(Fun) -> - case [Error || {Tab, TabDef} <- table_definitions(), + case [Error || {Tab, TabDef} <- table_definitions( + case is_disc_node() of + true -> disc; + false -> ram + end), case Fun(Tab, TabDef) of ok -> Error = none, false; {error, Error} -> true @@ -441,30 +483,47 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir)} of - {[], false} -> + WantDiscNode = should_be_disc_node(ClusterNodes), + WasDiscNode = is_disc_node(), + %% We create a new db (on disk, or in ram) in the first + %% two cases and attempt to upgrade the in the other two + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% New ram node; start from scratch + ok = create_schema(ram); + {[], false, true} -> %% Nothing there at all, start from scratch - ok = create_schema(); - {[], true} -> + ok = create_schema(disc); + {[], true, true} -> %% We're the first node up case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); version_not_available -> ok = schema_ok_or_move() - end, - ok; - {[AnotherNode|_], _} -> + end; + {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), + {CopyType, CopyTypeAlt} = + case WantDiscNode of + true -> {disc, disc_copies}; + false -> {ram, ram_copies} + end, ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), + ok = create_local_table_copy(schema, CopyTypeAlt), + ok = create_local_table_copies(CopyType), + ok = SecondaryPostMnesiaFun(), + %% We've taken down mnesia, so ram nodes will need + %% to re-sync + case is_disc_node() of + false -> start_mnesia(), + mnesia:change_config(extra_db_nodes, + ProperClusterNodes), + wait_for_replicated_tables(); + true -> ok + end, + ensure_schema_integrity(), ok end; @@ -495,7 +554,7 @@ schema_ok_or_move() -> "and recreating schema from scratch~n", [Reason]), ok = move_db(), - ok = create_schema() + ok = create_schema(disc) end. ensure_version_ok({ok, DiscVersion}) -> @@ -507,18 +566,27 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -create_schema() -> - mnesia:stop(), - rabbit_misc:ensure_ok(mnesia:create_schema([node()]), - cannot_create_schema), - rabbit_misc:ensure_ok(mnesia:start(), - cannot_start_mnesia), - ok = create_tables(), +create_schema(Type) -> + stop_mnesia(), + case Type of + disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]), + cannot_create_schema); + ram -> %% remove the disc schema since this is a ram node + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema) + end, + start_mnesia(), + ok = create_tables(Type), ensure_schema_integrity(), ok = rabbit_version:record_desired(). +is_disc_node() -> mnesia:system_info(use_dir). + +should_be_disc_node(ClusterNodes) -> + ClusterNodes == [] orelse lists:member(node(), ClusterNodes). + move_db() -> - mnesia:stop(), + stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), BackupDir = lists:flatten( @@ -536,14 +604,16 @@ move_db() -> MnesiaDir, BackupDir, Reason}}) end, ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + start_mnesia(), ok. copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_misc:recursive_copy(dir(), Destination). -create_tables() -> +create_tables() -> create_tables(disc). + +create_tables(Type) -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), case mnesia:create_table(Tab, TabDef1) of @@ -553,9 +623,13 @@ create_tables() -> Tab, TabDef1, Reason}}) end end, - table_definitions()), + table_definitions(Type)), ok. +copy_type_to_ram(TabDef) -> + [{disc_copies, []}, {ram_copies, [node()]} + | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. + table_has_copy_type(TabDef, DiscType) -> lists:member(node(), proplists:get_value(DiscType, TabDef, [])). @@ -585,7 +659,7 @@ create_local_table_copies(Type) -> end, ok = create_local_table_copy(Tab, StorageType) end, - table_definitions()), + table_definitions(Type)), ok. create_local_table_copy(Tab, Type) -> @@ -621,14 +695,14 @@ reset(Force) -> true -> ok; false -> ensure_mnesia_dir(), - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + start_mnesia(), {Nodes, RunningNodes} = try ok = init(), {all_clustered_nodes() -- [Node], running_clustered_nodes() -- [Node]} after - mnesia:stop() + stop_mnesia() end, leave_cluster(Nodes, RunningNodes), rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), @@ -651,6 +725,7 @@ leave_cluster(Nodes, RunningNodes) -> [schema, node()]) of {atomic, ok} -> true; {badrpc, nodedown} -> false; + {aborted, {node_not_running, _}} -> false; {aborted, Reason} -> throw({error, {failed_to_leave_cluster, Nodes, RunningNodes, Reason}}) @@ -661,3 +736,11 @@ leave_cluster(Nodes, RunningNodes) -> false -> throw({error, {no_running_cluster_nodes, Nodes, RunningNodes}}) end. + +start_mnesia() -> + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ensure_mnesia_running(). + +stop_mnesia() -> + stopped = mnesia:stop(), + ensure_mnesia_not_running(). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3f4162cd..e90e1281 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,13 +23,14 @@ client_ref/1, close_all_indicated/1, write/3, read/2, contains/2, remove/2, sync/3]). --export([sync/1, set_maximum_since_use/2, - has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([set_maximum_since_use/2, has_readers/2, combine_files/3, + delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, prioritise_call/3, prioritise_cast/2, + prioritise_info/2, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -153,7 +154,6 @@ -spec(sync/3 :: ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). --spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> @@ -443,9 +443,6 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). -sync(Server) -> - gen_server2:cast(Server, sync). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -682,7 +679,6 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -690,6 +686,12 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info(Msg, _State) -> + case Msg of + sync -> 8; + _ -> 0 + end. + handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); @@ -773,9 +775,6 @@ handle_cast({sync, MsgIds, K}, true -> noreply(State #msstate { on_sync = [K | Syncs] }) end; -handle_cast(sync, State) -> - noreply(internal_sync(State)); - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -799,6 +798,9 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). +handle_info(sync, State) -> + noreply(internal_sync(State)); + handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -836,6 +838,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, code_change(_OldVsn, State, _Extra) -> {ok, State}. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -863,13 +867,13 @@ next_state(State = #msstate { on_sync = Syncs, end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), + TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync), State #msstate { sync_timer_ref = TRef }. stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> State; stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), + erlang:cancel_timer(TRef), State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index aaf3df78..6388da8f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -76,11 +76,10 @@ %% the segment file combined with the journal, no writing needs to be %% done to the segment file either (in fact it is deleted if it exists %% at all). This is safe given that the set of acks is a subset of the -%% set of publishes. When it's necessary to sync messages because of -%% transactions, it's only necessary to fsync on the journal: when -%% entries are distributed from the journal to segment files, those -%% segments appended to are fsync'd prior to the journal being -%% truncated. +%% set of publishes. When it is necessary to sync messages, it is +%% sufficient to fsync on the journal: when entries are distributed +%% from the journal to segment files, those segments appended to are +%% fsync'd prior to the journal being truncated. %% %% This module is also responsible for scanning the queue index files %% and seeding the message store on start up. @@ -289,14 +288,13 @@ sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> sync_if([] =/= MsgIds, State). sync(SeqIds, State) -> - %% The SeqIds here contains the SeqId of every publish and ack in - %% the transaction. Ideally we should go through these seqids and - %% only sync the journal if the pubs or acks appear in the + %% The SeqIds here contains the SeqId of every publish and ack to + %% be sync'ed. Ideally we should go through these seqids and only + %% sync the journal if the pubs or acks appear in the %% journal. However, this would be complex to do, and given that %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the - %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled by sync_if anyway). + %% seqids not being in the journal. sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; @@ -571,13 +569,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, unacked = UnackedCount }) -> - Segment1 = Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries) }, - case Action of - del -> Segment1; - ack -> Segment1 #segment { unacked = UnackedCount - 1 }; - ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 } - end; + Segment #segment { + journal_entries = add_to_journal(RelSeq, Action, JEntries), + unacked = UnackedCount + case Action of + ?PUB -> +1; + del -> 0; + ack -> -1 + end}; add_to_journal(RelSeq, Action, JEntries) -> Val = case array:get(RelSeq, JEntries) of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f5214a77..2dccc748 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,8 +28,6 @@ -export([process_channel_frame/5]). %% used by erlang-client --export([emit_stats/1]). - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). @@ -70,7 +68,6 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). --spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -126,9 +123,6 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. -emit_stats(Pid) -> - gen_server:cast(Pid, emit_stats). - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -323,8 +317,8 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); -handle_other({'$gen_cast', emit_stats}, Deb, State) -> - mainloop(Deb, internal_emit_stats(State)); +handle_other(emit_stats, Deb, State) -> + mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other(Other, _Deb, _State) -> @@ -591,10 +585,8 @@ refuse_connection(Sock, Exception) -> ensure_stats_timer(State = #v1{stats_timer = StatsTimer, connection_state = running}) -> - Self = self(), State#v1{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, - fun() -> emit_stats(Self) end)}; + StatsTimer, self(), emit_stats)}; ensure_stats_timer(State) -> State. @@ -677,7 +669,6 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, end; handle_method0(#'connection.open'{virtual_host = VHostPath}, - State = #v1{connection_state = opening, connection = Connection = #connection{ user = User, @@ -695,7 +686,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), rabbit_event:if_enabled(StatsTimer, - fun() -> internal_emit_stats(State1) end), + fun() -> emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -924,6 +915,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, State1#v1.sock, 0, CloseMethod, Protocol), State1. -internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> +emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 8f166672..d453a870 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -110,8 +110,10 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, slave_pids = SPids}] -> + [QPid | SPids ++ QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3f4aa54e..ed4efb47 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -203,6 +203,42 @@ test_priority_queue() -> {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = test_priority_queue(Q15), + %% 1-element infinity priority Q + Q16 = priority_queue:in(foo, infinity, Q), + {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16), + + %% add infinity to 0-priority Q + Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q17), + + %% and the other way around + Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q18), + + %% add infinity to mixed-priority Q + Q19 = priority_queue:in(qux, infinity, Q3), + {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} = + test_priority_queue(Q19), + + %% merge the above with a negative priority Q + Q20 = priority_queue:join(Q19, Q4), + {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}], + [qux, bar, foo, foo]} = test_priority_queue(Q20), + + %% merge two infinity priority queues + Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q), + priority_queue:in(bar, infinity, Q)), + {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} = + test_priority_queue(Q21), + + %% merge two mixed priority with infinity queues + Q22 = priority_queue:join(Q18, Q20), + {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo}, + {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} = + test_priority_queue(Q22), + passed. priority_queue_in_all(Q, L) -> @@ -705,7 +741,6 @@ test_topic_expect_match(X, List) -> Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, immediate = false, - txn = none, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -905,7 +940,6 @@ test_option_parser() -> passed. test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped {error, _} = control_action(cluster, []), {error, _} = control_action(reset, []), @@ -953,13 +987,16 @@ test_cluster_management() -> ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), %% join a non-existing cluster as a ram node ok = control_action(reset, []), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of @@ -978,15 +1015,18 @@ test_cluster_management2(SecondaryNode) -> %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), + ok = assert_disc_node(), %% make a ram node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), + ok = assert_ram_node(), %% join cluster as a ram node ok = control_action(reset, []), ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), %% change cluster config while remaining in same cluster ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), @@ -998,27 +1038,45 @@ test_cluster_management2(SecondaryNode) -> "invalid2@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), - %% join empty cluster as a ram node + %% join empty cluster as a ram node (converts to disc) ok = control_action(cluster, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), - %% turn ram node into disk node + %% make a new ram node ok = control_action(reset, []), + ok = control_action(force_cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok = assert_ram_node(), + + %% turn ram node into disk node ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_disc_node(), %% convert a disk node into a ram node + ok = assert_disc_node(), ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), + ok = assert_ram_node(), + + %% make a new disk node + ok = control_action(force_reset, []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok = assert_disc_node(), %% turn a disk node into a ram node ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + ok = assert_ram_node(), %% NB: this will log an inconsistent_database error, which is harmless %% Turning cover on / off is OK even if we're not in general using cover, @@ -1044,6 +1102,10 @@ test_cluster_management2(SecondaryNode) -> {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), + %% attempt to change type when no other node is alive + {error, {no_running_cluster_nodes, _, _}} = + control_action(cluster, [SecondaryNodeS]), + %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), ok = control_action(start_app, []), @@ -1072,15 +1134,25 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), + {error, {no_such_user, _}} = + control_action(set_user_tags, ["foo", "bar"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), ok = control_action(change_password, ["foo", "baz"]), - ok = control_action(set_admin, ["foo"]), - ok = control_action(clear_admin, ["foo"]), - ok = control_action(list_users, []), + + TestTags = fun (Tags) -> + Args = ["foo" | [atom_to_list(T) || T <- Tags]], + ok = control_action(set_user_tags, Args), + {ok, #internal_user{tags = Tags}} = + rabbit_auth_backend_internal:lookup_user(<<"foo">>), + ok = control_action(list_users, []) + end, + TestTags([foo, bar, baz]), + TestTags([administrator]), + TestTags([]), %% vhost creation ok = control_action(add_vhost, ["/testhost"]), @@ -1203,10 +1275,10 @@ test_spawn() -> user(Username) -> #user{username = Username, - is_admin = true, + tags = [administrator], auth_backend = rabbit_auth_backend_internal, impl = #internal_user{username = Username, - is_admin = true}}. + tags = [administrator]}}. test_statistics_event_receiver(Pid) -> receive @@ -1215,7 +1287,7 @@ test_statistics_event_receiver(Pid) -> test_statistics_receive_event(Ch, Matcher) -> rabbit_channel:flush(Ch), - rabbit_channel:emit_stats(Ch), + Ch ! emit_stats, test_statistics_receive_event1(Ch, Matcher). test_statistics_receive_event1(Ch, Matcher) -> @@ -1572,6 +1644,18 @@ clean_logs(Files, Suffix) -> end || File <- Files], ok. +assert_ram_node() -> + case rabbit_mnesia:is_disc_node() of + true -> exit('not_ram_node'); + false -> ok + end. + +assert_disc_node() -> + case rabbit_mnesia:is_disc_node() of + true -> ok; + false -> exit('not_disc_node') + end. + delete_file(File) -> case file:delete(File) of ok -> ok; @@ -1663,6 +1747,10 @@ test_backing_queue() -> passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, MaxJournal, infinity), + %% We will have restarted the message store, and thus changed + %% the order of the children of rabbit_sup. This will cause + %% problems if there are subsequent failures - see bug 24262. + ok = restart_app(), passed; _ -> passed @@ -1902,6 +1990,10 @@ with_empty_test_queue(Fun) -> {0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). +restart_app() -> + rabbit:stop(), + rabbit:start(). + queue_index_publish(SeqIds, Persistent, Qi) -> Ref = rabbit_guid:guid(), MsgStore = case Persistent of @@ -2074,11 +2166,14 @@ test_queue_index() -> variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). + Q, Recover, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> + variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> lists:foldl( - fun (_N, VQN) -> + fun (N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), @@ -2086,7 +2181,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, self(), VQN) + PropFun(N, #message_properties{}), self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2119,6 +2214,29 @@ with_fresh_variable_queue(Fun) -> _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. +publish_and_confirm(QPid, Payload, Count) -> + Seqs = lists:seq(1, Count), + [begin + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, immediate = false, + sender = self(), message = Msg, msg_seq_no = Seq}, + true = rabbit_amqqueue:deliver(QPid, Delivery) + end || Seq <- Seqs], + wait_for_confirms(gb_sets:from_list(Seqs)). + +wait_for_confirms(Unconfirmed) -> + case gb_sets:is_empty(Unconfirmed) of + true -> ok; + false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> + wait_for_confirms( + gb_sets:difference(Unconfirmed, + gb_sets:from_list(Confirmed))) + after 5000 -> exit(timeout_waiting_for_confirm) + end + end. + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, @@ -2126,6 +2244,7 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, + fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1]], passed. @@ -2162,14 +2281,9 @@ test_dropwhile(VQ0) -> Count = 10, %% add messages with sequential expiry - VQ1 = lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, self(), VQN) - end, VQ0, lists:seq(1, Count)), + VQ1 = variable_queue_publish( + false, Count, + fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( @@ -2189,6 +2303,14 @@ test_dropwhile(VQ0) -> VQ4. +test_dropwhile_varying_ram_duration(VQ0) -> + VQ1 = variable_queue_publish(false, 1, VQ0), + VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), + VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ5 = variable_queue_publish(false, 1, VQ4), + rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2308,17 +2430,10 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - TxID = rabbit_guid:guid(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, <<>>), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, <<>>, Count), + exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok @@ -2345,18 +2460,10 @@ test_variable_queue_delete_msg_store_files_callback() -> ok = restart_msg_store_empty(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - TxID = rabbit_guid:guid(), Payload = <<0:8388608>>, %% 1MB Count = 30, - [begin - Msg = rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, Payload, Count), + rabbit_amqqueue:set_ram_duration_target(QPid, 0), CountMinusOne = Count - 1, diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1f0f8bbe..2db960ac 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -20,7 +20,7 @@ -ifdef(use_specs). --export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0, +-export_type([maybe/1, info/0, infos/0, info_key/0, info_keys/0, message/0, msg_id/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, message_properties/0, @@ -73,16 +73,12 @@ -type(delivery() :: #delivery{mandatory :: boolean(), immediate :: boolean(), - txn :: maybe(txn()), sender :: pid(), message :: message()}). -type(message_properties() :: #message_properties{expiry :: pos_integer() | 'undefined', needs_confirming :: boolean()}). -%% this is really an abstract type, but dialyzer does not support them --type(txn() :: rabbit_guid:guid()). - -type(info_key() :: atom()). -type(info_keys() :: [info_key()]). @@ -124,7 +120,9 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + slave_pids :: [pid()], + mirror_nodes :: [node()] | 'undefined' | 'all'}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), @@ -139,14 +137,14 @@ -type(user() :: #user{username :: username(), - is_admin :: boolean(), + tags :: [atom()], auth_backend :: atom(), impl :: any()}). -type(internal_user() :: #internal_user{username :: username(), password_hash :: password_hash(), - is_admin :: boolean()}). + tags :: [atom()]}). -type(username() :: binary()). -type(password() :: binary()). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index a2abb1e5..9739f6b7 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -144,7 +144,7 @@ upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> AfterUs = rabbit_mnesia:read_previously_running_nodes(), - case {is_disc_node(), AfterUs} of + case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> @@ -182,12 +182,6 @@ upgrade_mode(AllNodes) -> end end. -is_disc_node() -> - %% This is pretty ugly but we can't start Mnesia and ask it (will hang), - %% we can't look at the config file (may not include us even if we're a - %% disc node). - filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). - die(Msg, Args) -> %% We don't throw or exit here since that gets thrown %% straight out into do_boot, generating an erl_crash.dump @@ -218,7 +212,7 @@ force_tables() -> secondary_upgrade(AllNodes) -> %% must do this before we wipe out schema - IsDiscNode = is_disc_node(), + IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), %% Note that we cluster with all nodes, rather than all disc nodes @@ -282,6 +276,14 @@ lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). backup_dir() -> dir() ++ "-upgrade-backup". +is_disc_node_legacy() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will + %% hang), we can't look at the config file (may not include us + %% even if we're a disc node). We also can't use + %% rabbit_mnesia:is_disc_node/0 because that will give false + %% postivies on Rabbit up to 2.5.1. + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + %% NB: we cannot use rabbit_log here since it may not have been %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index bba7f47d..8d26866b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -16,7 +16,8 @@ -module(rabbit_upgrade_functions). --include("rabbit.hrl"). +%% If you are tempted to add include("rabbit.hrl"). here, don't. Using record +%% defs here leads to pain later. -compile([export_all]). @@ -29,6 +30,10 @@ -rabbit_upgrade({semi_durable_route, mnesia, []}). -rabbit_upgrade({exchange_event_serial, mnesia, []}). -rabbit_upgrade({trace_exchanges, mnesia, [internal_exchanges]}). +-rabbit_upgrade({user_admin_to_tags, mnesia, [user_to_internal_user]}). +-rabbit_upgrade({ha_mirrors, mnesia, []}). +-rabbit_upgrade({gm, mnesia, []}). +-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). %% ------------------------------------------------------------------- @@ -43,6 +48,10 @@ -spec(semi_durable_route/0 :: () -> 'ok'). -spec(exchange_event_serial/0 :: () -> 'ok'). -spec(trace_exchanges/0 :: () -> 'ok'). +-spec(user_admin_to_tags/0 :: () -> 'ok'). +-spec(ha_mirrors/0 :: () -> 'ok'). +-spec(gm/0 :: () -> 'ok'). +-spec(exchange_scratch/0 :: () -> 'ok'). -endif. @@ -121,6 +130,46 @@ trace_exchanges() -> VHost <- rabbit_vhost:list()], ok. +user_admin_to_tags() -> + transform( + rabbit_user, + fun({internal_user, Username, PasswordHash, true}) -> + {internal_user, Username, PasswordHash, [administrator]}; + ({internal_user, Username, PasswordHash, false}) -> + {internal_user, Username, PasswordHash, [management]} + end, + [username, password_hash, tags], internal_user). + +ha_mirrors() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddMirrorPidsFun = + fun ({amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid}) -> + {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, + [], undefined} + end, + [ ok = transform(T, + AddMirrorPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, mirror_nodes]) + || T <- Tables ], + ok. + +gm() -> + create(gm_group, [{record_name, gm_group}, + {attributes, [name, version, members]}]). + +exchange_scratch() -> + ok = exchange_scratch(rabbit_exchange), + ok = exchange_scratch(rabbit_durable_exchange). + +exchange_scratch(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratch]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> @@ -142,11 +191,7 @@ create(Tab, TabDef) -> %% the exchange type registry or worker pool to be running by dint of %% not validating anything and assuming the exchange type does not %% require serialisation. +%% NB: this assumes the pre-exchange-scratch-space format declare_exchange(XName, Type) -> - X = #exchange{name = XName, - type = Type, - durable = true, - auto_delete = false, - internal = false, - arguments = []}, + X = {exchange, XName, Type, true, false, false, []}, ok = mnesia:dirty_write(rabbit_durable_exchange, X). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a167cca0..ea72de66 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,19 +16,18 @@ -module(rabbit_variable_queue). --export([init/4, terminate/2, delete_and_terminate/2, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/3, discard/3, + status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/2, stop_msg_store/0, init/5]). %%---------------------------------------------------------------------------- %% Definitions: @@ -238,12 +237,10 @@ ram_ack_index, index_state, msg_store_clients, - on_sync, durable, transient_threshold, async_callback, - sync_callback, len, persistent_count, @@ -284,10 +281,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(tx, { pending_messages, pending_acks }). - --record(sync, { acks_persistent, acks_all, pubs, funs }). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -320,12 +313,6 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). --type(sync() :: #sync { acks_persistent :: [[seq_id()]], - acks_all :: [[seq_id()]], - pubs :: [{message_properties_transformer(), - [rabbit_types:basic_message()]}], - funs :: [fun (() -> any())] }). - -type(state() :: #vqstate { q1 :: queue(), q2 :: bpqueue:bpqueue(), @@ -338,12 +325,10 @@ index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), durable :: boolean(), transient_threshold :: non_neg_integer(), async_callback :: async_callback(), - sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -376,11 +361,6 @@ count = 0, end_seq_id = Z }). --define(BLANK_SYNC, #sync { acks_persistent = [], - acks_all = [], - pubs = [], - funs = [] }). - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -409,17 +389,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback, SyncCallback) -> - init(Queue, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback) -> + init(Queue, Recover, AsyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, false, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, + init(IsDurable, IndexState, 0, [], AsyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -428,7 +408,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); init(#amqqueue { name = QueueName, durable = true }, true, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -449,14 +429,14 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, PersistentClient, TransientClient). terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(true, tx_commit_index(State)), + remove_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), @@ -560,173 +540,35 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), - a(State1). - -dropwhile1(Pred, State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, State2); - false -> {ok, in_r(MsgStatus, State1)} - end - end, State). - -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> - true = queue:is_empty(Q4), %% ASSERTION - State #vqstate { - q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + case queue_out(State) of + {empty, State1} -> + a(State1); + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> a(in_r(MsgStatus, State1)) + end + end. fetch(AckRequired, State) -> - internal_queue_out( - fun(MsgStatus, State1) -> - %% it's possible that the message wasn't read from disk - %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) - end, State). - -internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> - case queue:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) - end; - {{value, MsgStatus}, Q4a} -> - Fun(MsgStatus, State #vqstate { q4 = Q4a }) + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + %% it is possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), + {Res, a(State3)} end. -read_msg(MsgStatus = #msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> - {MsgStatus, State}. - -internal_fetch(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount }) -> - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) - end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - - {{Msg, IsDelivered, AckTag, Len1}, - a(State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1 })}. - ack(AckTags, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (_, State0) -> State0 end, AckTags, State), {MsgIds, a(State1)}. -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - _ChPid, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), - #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(false, MsgStatus, MSCState); - false -> ok - end, - a(State). - -tx_ack(Txn, AckTags, State) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - State. - -tx_rollback(Txn, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - ok = case IsDurable of - true -> msg_store_remove(MSCState, true, - persistent_msg_ids(Pubs)); - false -> ok - end, - {lists:append(AckTags), a(State)}. - -tx_commit(Txn, Fun, MsgPropsFun, - State = #vqstate { durable = IsDurable, - async_callback = AsyncCallback, - sync_callback = SyncCallback, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - AckTags1 = lists:append(AckTags), - PersistentMsgIds = persistent_msg_ids(Pubs), - HasPersistentPubs = PersistentMsgIds =/= [], - {AckTags1, - a(case IsDurable andalso HasPersistentPubs of - true -> MsgStoreCallback = - fun () -> msg_store_callback( - PersistentMsgIds, Pubs, AckTags1, Fun, - MsgPropsFun, AsyncCallback, SyncCallback) - end, - ok = msg_store_sync(MSCState, true, PersistentMsgIds, - fun () -> spawn(MsgStoreCallback) end), - State; - false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, - Fun, MsgPropsFun, State) - end)}. - requeue(AckTags, MsgPropsFun, State) -> MsgPropsFun1 = fun (MsgProps) -> (MsgPropsFun(MsgProps)) #message_properties { @@ -832,23 +674,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State = #vqstate { on_sync = OnSync }) -> - case {OnSync, needs_index_sync(State)} of - {?BLANK_SYNC, false} -> - case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - _ -> - timed +needs_timeout(State) -> + case needs_index_sync(State) of + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; + true -> timed end. timeout(State) -> - a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). + a(reduce_memory_use(confirm_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -858,7 +699,6 @@ status(#vqstate { len = Len, pending_ack = PA, ram_ack_index = RAI, - on_sync = #sync { funs = From }, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -875,7 +715,6 @@ status(#vqstate { {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -887,10 +726,9 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. -invoke(?MODULE, Fun, State) -> - Fun(?MODULE, State). +invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). -is_duplicate(_Txn, _Msg, State) -> {false, State}. +is_duplicate(_Msg, State) -> {false, State}. discard(_Msg, _ChPid, State) -> State. @@ -986,11 +824,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) -> MSCState, IsPersistent, fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end). - msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( MSCState, IsPersistent, @@ -1007,20 +840,6 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [] }; - V -> V - end. - -store_tx(Txn, Tx) -> put({txn, Txn}, Tx). - -erase_tx(Txn) -> erase({txn, Txn}). - -persistent_msg_ids(Pubs) -> - [MsgId || {#basic_message { id = MsgId, - is_persistent = true }, _MsgProps} <- Pubs]. - betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( @@ -1084,8 +903,8 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, - AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> +init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, + PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1107,12 +926,10 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, durable = IsDurable, transient_threshold = NextSeqId, async_callback = AsyncCallback, - sync_callback = SyncCallback, len = DeltaCount1, persistent_count = DeltaCount1, @@ -1141,87 +958,94 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, - AsyncCallback, SyncCallback) -> - case SyncCallback(?MODULE, - fun (?MODULE, StateN) -> - tx_commit_post_msg_store(true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) - end) of - ok -> ok; - error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) +in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> + case queue:is_empty(Q4) of + true -> State #vqstate { + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = + read_msg(MsgStatus, State), + State1 #vqstate { q4 = queue:in_r(MsgStatus1, Q4a) } + end; +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + +queue_out(State = #vqstate { q4 = Q4 }) -> + case queue:out(Q4) of + {empty, _Q4} -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} + end; + {{value, MsgStatus}, Q4a} -> + {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. -remove_persistent_messages(MsgIds, AsyncCallback) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, - undefined, AsyncCallback), - ok = rabbit_msg_store:remove(MsgIds, PersistentClient), - rabbit_msg_store:client_delete_and_terminate(PersistentClient). - -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, - State = #vqstate { - on_sync = OnSync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - pending_ack = PA, - durable = IsDurable }) -> - PersistentAcks = - case IsDurable of - true -> [AckTag || AckTag <- AckTags, - case dict:fetch(AckTag, PA) of - #msg_status {} -> - false; - {IsPersistent, _MsgId, _MsgProps} -> - IsPersistent - end]; - false -> [] +read_msg(MsgStatus = #msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) + end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 end, - case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { - on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [{MsgPropsFun, Pubs} | SPubs], - funs = [Fun | SFuns] }}; - false -> State1 = tx_commit_index( - State #vqstate { - on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [{MsgPropsFun, Pubs}], - funs = [Fun] } }), - State1 #vqstate { on_sync = OnSync } - end. -tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - State; -tx_commit_index(State = #vqstate { on_sync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - durable = IsDurable }) -> - PAcks = lists:append(SPAcks), - Acks = lists:append(SAcks), - Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), - {Msg, MsgProps} <- lists:reverse(PubsN)], - {_MsgIds, State1} = ack(Acks, State), - {SeqIds, State2 = #vqstate { index_state = IndexState }} = - lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps}, - {SeqIdsAcc, State3}) -> - IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State4} = - publish(Msg, MsgProps, false, IsPersistent1, State3), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} - end, {PAcks, State1}, Pubs), - IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), - [ Fun() || Fun <- lists:reverse(SFuns) ], - reduce_memory_use( - State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + %% 3. If an ack is required, add something sensible to PA + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + + {{Msg, IsDelivered, AckTag, Len1}, + State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index ec1ee9cd..405949ef 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -76,7 +76,6 @@ %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). --export([delayed_restart/2]). -define(DICT, dict). @@ -157,9 +156,6 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. -delayed_restart(Supervisor, RestartDetails) -> - gen_server:cast(Supervisor, {delayed_restart, RestartDetails}). - %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -355,12 +351,19 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. +%%% Hopefully cause a function-clause as there is no API function +%%% that utilizes cast. +handle_cast(null, State) -> + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + []), + + {noreply, State}. -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), {noreply, NState}; -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of {value, Child1} -> {ok, NState} = do_restart(RestartType, Reason, Child1, State), @@ -369,14 +372,6 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> {noreply, State} end; -%%% Hopefully cause a function-clause as there is no API function -%%% that utilizes cast. -handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", - []), - - {noreply, State}. - %% %% Take care of terminated children. %% @@ -539,9 +534,9 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, NState} -> {ok, NState}; {terminate, NState} -> - {ok, _TRef} = timer:apply_after( - trunc(Delay*1000), ?MODULE, delayed_restart, - [self(), {{RestartType, Delay}, Reason, Child}]), + _TRef = erlang:send_after(trunc(Delay*1000), self(), + {delayed_restart, + {{RestartType, Delay}, Reason, Child}}), {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> |