diff options
36 files changed, 753 insertions, 390 deletions
@@ -93,7 +93,7 @@ all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) rm -f $@ - escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@ + echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< @@ -111,27 +111,23 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \ - -eval \ - "init:stop()." - - + dialyzer --plt $(BASIC_PLT) --no_native \ + -Wrace_conditions $(BEAM_TARGETS) # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target create-plt: $(RABBIT_PLT) $(RABBIT_PLT): $(BEAM_TARGETS) $(BASIC_PLT) - cp $(BASIC_PLT) $@ - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:add_to_plt(\"$@\", \"$(BEAM_TARGETS)\"))." + dialyzer --plt $(BASIC_PLT) --output_plt $@ --no_native \ + --add_to_plt $(BEAM_TARGETS) $(BASIC_PLT): $(BEAM_TARGETS) if [ -f $@ ]; then \ touch $@; \ else \ - $(ERL_EBIN) -eval \ - "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:create_basic_plt(\"$@\"))."; \ + dialyzer --output_plt $@ --build_plt \ + --apps erts kernel stdlib compiler sasl os_mon mnesia tools \ + public_key crypto ssl; \ fi clean: @@ -271,7 +267,9 @@ $(SOURCE_DIR)/%_usage.erl: docs_all: $(MANPAGES) $(WEB_MANPAGES) -install: all docs_all install_dirs +install: install_bin install_docs + +install_bin: all install_dirs cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* @@ -279,14 +277,16 @@ install: all docs_all install_dirs cp scripts/$$script $(TARGET_DIR)/sbin; \ [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ done + mkdir -p $(TARGET_DIR)/plugins + echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README + +install_docs: docs_all install_dirs for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ for manpage in $(DOCS_DIR)/*.$$section.gz; do \ cp $$manpage $(MAN_DIR)/man$$section; \ done; \ done - mkdir -p $(TARGET_DIR)/plugins - echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README install_dirs: @ OK=true && \ @@ -345,9 +345,11 @@ def genErl(spec): print "%% Various types" print "-ifdef(use_specs)." - print """-export_type([amqp_table/0, amqp_property_type/0, amqp_method_record/0, - amqp_method_name/0, amqp_method/0, amqp_class_id/0, - amqp_value/0, amqp_array/0, amqp_exception/0, amqp_property_record/0]). + print """-export_type([amqp_field_type/0, amqp_property_type/0, + amqp_table/0, amqp_array/0, amqp_value/0, + amqp_method_name/0, amqp_method/0, amqp_method_record/0, + amqp_method_field_name/0, amqp_property_record/0, + amqp_exception/0, amqp_exception_code/0, amqp_class_id/0]). -type(amqp_field_type() :: 'longstr' | 'signedint' | 'decimal' | 'timestamp' | @@ -415,7 +417,7 @@ def genErl(spec): (amqp_method_name(), binary()) -> amqp_method_record() | rabbit_types:connection_exit()). -spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). -spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). --spec(encode_properties/1 :: (amqp_method_record()) -> binary()). +-spec(encode_properties/1 :: (amqp_property_record()) -> binary()). -spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). -spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). -endif. % use_specs diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 222fd905..392a479a 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -976,6 +976,11 @@ <listitem><para>Peer port.</para></listitem> </varlistentry> <varlistentry> + <term>ssl</term> + <listitem><para>Boolean indicating whether the + connection is secured with SSL.</para></listitem> + </varlistentry> + <varlistentry> <term>peer_cert_subject</term> <listitem><para>The subject of the peer's SSL certificate, in RFC4514 form.</para></listitem> diff --git a/generate_deps b/generate_deps index 29587b5a..ddfca816 100644 --- a/generate_deps +++ b/generate_deps @@ -2,18 +2,21 @@ %% -*- erlang -*- -mode(compile). -main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> - ErlDirContents = filelib:wildcard("*.erl", ErlDir), - ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents], +%% We expect the list of Erlang source and header files to arrive on +%% stdin, with the entries colon-separated. +main([TargetFile, EbinDir]) -> + ErlsAndHrls = [ string:strip(S,left) || + S <- string:tokens(io:get_line(""), ":\n")], + ErlFiles = [F || F <- ErlsAndHrls, lists:suffix(".erl", F)], Modules = sets:from_list( [list_to_atom(filename:basename(FileName, ".erl")) || - FileName <- ErlDirContents]), - Headers = sets:from_list( - [filename:join(IncludeDir, FileName) || - FileName <- filelib:wildcard("*.hrl", IncludeDir)]), + FileName <- ErlFiles]), + HrlFiles = [F || F <- ErlsAndHrls, lists:suffix(".hrl", F)], + IncludeDirs = lists:usort([filename:dirname(Path) || Path <- HrlFiles]), + Headers = sets:from_list(HrlFiles), Deps = lists:foldl( fun (Path, Deps1) -> - dict:store(Path, detect_deps(IncludeDir, EbinDir, + dict:store(Path, detect_deps(IncludeDirs, EbinDir, Modules, Headers, Path), Deps1) end, dict:new(), ErlFiles), @@ -33,8 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok = file:sync(Hdl), ok = file:close(Hdl). -detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> - {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), +detect_deps(IncludeDirs, EbinDir, Modules, Headers, Path) -> + {ok, Forms} = epp:parse_file(Path, IncludeDirs, [{use_specs, true}]), lists:foldl( fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps) when Attribute =:= behaviour orelse Attribute =:= behavior -> diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 3a22eef0..ee79c95a 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -1,7 +1,9 @@ -TARBALL_DIR=../../dist -TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) +TARBALL_SRC_DIR=../../dist +TARBALL_BIN_DIR=../../packaging/generic-unix/ +TARBALL_SRC=$(wildcard $(TARBALL_SRC_DIR)/rabbitmq-server-[0-9.]*.tar.gz) +TARBALL_BIN=$(wildcard $(TARBALL_BIN_DIR)/rabbitmq-server-generic-unix-[0-9.]*.tar.gz) COMMON_DIR=../common -VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') +VERSION=$(shell echo $(TARBALL_SRC) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') # The URL at which things really get deployed REAL_WEB_URL=http://www.rabbitmq.com/ @@ -23,10 +25,7 @@ dirs: mkdir -p $(DEST)/files $(DEST)/Portfile: Portfile.in - for algo in md5 sha1 rmd160 ; do \ - checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \ - echo "s|@$$algo@|$$checksum|g" ; \ - done >checksums.sed + ./make-checksums.sh $(TARBALL_SRC) $(TARBALL_BIN) > checksums.sed sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \ -f checksums.sed <$^ >$@ rm checksums.sed diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index e37a45b3..ce6b1e34 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -17,13 +17,19 @@ long_description \ homepage @BASE_URL@ master_sites @BASE_URL@releases/rabbitmq-server/v${version}/ +distfiles ${name}-${version}${extract.suffix} \ + ${name}-generic-unix-${version}${extract.suffix} + checksums \ - md5 @md5@ \ - sha1 @sha1@ \ - rmd160 @rmd160@ + ${name}-${version}${extract.suffix} md5 @md5-src@ \ + ${name}-${version}${extract.suffix} sha1 @sha1-src@ \ + ${name}-${version}${extract.suffix} rmd160 @rmd160-src@ \ + ${name}-generic-unix-${version}${extract.suffix} md5 @md5-bin@ \ + ${name}-generic-unix-${version}${extract.suffix} sha1 @sha1-bin@ \ + ${name}-generic-unix-${version}${extract.suffix} rmd160 @rmd160-bin@ depends_lib port:erlang -depends_build port:xmlto port:libxslt +depends_build port:libxslt platform darwin 7 { depends_build-append port:py25-simplejson @@ -49,11 +55,15 @@ set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server set sbindir ${destroot}${prefix}/lib/rabbitmq/bin set wrappersbin ${destroot}${prefix}/sbin set realsbin ${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version}/sbin +set mansrc ${workpath}/rabbitmq_server-${version}/share/man +set mandest ${destroot}${prefix}/share/man use_configure no use_parallel_build yes +destroot.target install_bin + destroot.destdir \ TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ @@ -93,6 +103,11 @@ post-destroot { ${wrappersbin}/rabbitmq-multi file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl + + file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ + file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/ } pre-install { diff --git a/packaging/macports/make-checksums.sh b/packaging/macports/make-checksums.sh new file mode 100755 index 00000000..11424dfc --- /dev/null +++ b/packaging/macports/make-checksums.sh @@ -0,0 +1,14 @@ +#!/bin/bash +# NB: this script requires bash +tarball_src=$1 +tarball_bin=$2 +for type in src bin +do + tarball_var=tarball_${type} + tarball=${!tarball_var} + for algo in md5 sha1 rmd160 + do + checksum=$(openssl $algo ${tarball} | awk '{print $NF}') + echo "s|@$algo-$type@|$checksum|g" + done +done diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 230d1f2a..6e02b23e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -177,7 +177,7 @@ format_status/2]). %% Internal exports --export([init_it/6, print_event/3]). +-export([init_it/6]). -import(error_logger, [format/2]). @@ -192,10 +192,13 @@ -ifdef(use_specs). --spec(handle_common_termination/3 :: - (any(), atom(), #gs2_state{}) -> no_return()). +-type(gs2_state() :: #gs2_state{}). --spec(hibernate/1 :: (#gs2_state{}) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), gs2_state()) -> no_return()). +-spec(hibernate/1 :: (gs2_state()) -> no_return()). +-spec(pre_hibernate/1 :: (gs2_state()) -> no_return()). +-spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()). -endif. @@ -612,7 +615,7 @@ process_msg(Msg, _Msg when Debug =:= [] -> handle_msg(Msg, GS2State); _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}), handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. @@ -838,13 +841,13 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, time = Time1, debug = Debug1}); {noreply, NState} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state {state = NState, time = infinity, debug = Debug1}); {noreply, NState, Time1} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state {state = NState, time = Time1, @@ -866,13 +869,13 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, debug = Debug}) -> case Reply of {noreply, NState} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state { state = NState, time = infinity, debug = Debug1 }); {noreply, NState, Time1} -> - Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + Debug1 = common_debug(Debug, fun print_event/3, Name, {noreply, NState}), loop(GS2State #gs2_state { state = NState, time = Time1, @@ -894,7 +897,7 @@ handle_common_termination(Reply, Msg, GS2State) -> reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), sys:handle_debug( - Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). + Debug, fun print_event/3, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- @@ -903,10 +906,6 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> system_continue(Parent, Debug, GS2State) -> loop(GS2State #gs2_state { parent = Parent, debug = Debug }). --ifdef(use_specs). --spec system_terminate(_, _, _, [_]) -> no_return(). --endif. - system_terminate(Reason, _Parent, Debug, GS2State) -> terminate(Reason, [], GS2State #gs2_state { debug = Debug }). diff --git a/src/rabbit.erl b/src/rabbit.erl index 8c36a9f0..a1dd2c2e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) -> ok end. -module_attributes(Module) -> - case catch Module:module_info(attributes) of - {'EXIT', {undef, [{Module, module_info, _} | _]}} -> - io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", - [Module]), - []; - {'EXIT', Reason} -> - exit(Reason); - V -> - V - end. - boot_steps() -> - AllApps = [App || {App, _, _} <- application:loaded_applications()], - Modules = lists:usort( - lists:append([Modules - || {ok, Modules} <- - [application:get_key(App, modules) - || App <- AllApps]])), - UnsortedSteps = - lists:flatmap(fun (Module) -> - [{StepName, Attributes} - || {rabbit_boot_step, [{StepName, Attributes}]} - <- module_attributes(Module)] - end, Modules), - sort_boot_steps(UnsortedSteps). + sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). + +vertices(_Module, Steps) -> + [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. + +edges(_Module, Steps) -> + [case Key of + requires -> {StepName, OtherStep}; + enables -> {OtherStep, StepName} + end || {StepName, Atts} <- Steps, + {Key, OtherStep} <- Atts, + Key =:= requires orelse Key =:= enables]. + +graph_build_error({vertex, duplicate, StepName}) -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); +graph_build_error({edge, Reason, From, To}) -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]). sort_boot_steps(UnsortedSteps) -> - G = digraph:new([acyclic]), - - %% Add vertices, with duplicate checking. - [case digraph:vertex(G, StepName) of - false -> digraph:add_vertex(G, StepName, Step); - _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) - end || Step = {StepName, _Attrs} <- UnsortedSteps], - - %% Add edges, detecting cycles and missing vertices. - lists:foreach(fun ({StepName, Attributes}) -> - [add_boot_step_dep(G, StepName, PrecedingStepName) - || {requires, PrecedingStepName} <- Attributes], - [add_boot_step_dep(G, SucceedingStepName, StepName) - || {enables, SucceedingStepName} <- Attributes] - end, UnsortedSteps), + G = rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is %% one, otherwise fail). @@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) -> [MissingFunctions]) end. -add_boot_step_dep(G, RunsSecond, RunsFirst) -> - case digraph:add_edge(G, RunsSecond, RunsFirst) of - {error, Reason} -> - boot_error("Could not add boot step dependency of ~w on ~w:~n~s", - [RunsSecond, RunsFirst, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) - || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]); - _ -> - ok - end. - %%--------------------------------------------------------------------------- log_location(Type) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 45df6cfb..1826347d 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -37,7 +37,7 @@ check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). --export([change_password_hash/2]). +-export([change_password_hash/2, hash_password/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, @@ -47,7 +47,7 @@ -ifdef(use_specs). --export_type([username/0, password/0]). +-export_type([username/0, password/0, password_hash/0]). -type(permission_atom() :: 'configure' | 'read' | 'write'). -type(username() :: binary()). @@ -73,9 +73,10 @@ -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). -spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). +-spec(hash_password/1 :: (password()) -> password_hash()). -spec(set_admin/1 :: (username()) -> 'ok'). -spec(clear_admin/1 :: (username()) -> 'ok'). --spec(list_users/0 :: () -> [username()]). +-spec(list_users/0 :: () -> [{username(), boolean()}]). -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9d78bafa..5cdd0e3c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -97,14 +97,14 @@ -spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()). -spec(info/2 :: - (rabbit_types:amqqueue(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). + (rabbit_types:amqqueue(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -162,7 +162,7 @@ -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). +-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -endif. @@ -361,7 +361,7 @@ consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> - lists:concat( + lists:append( map(VHostPath, fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2c975b..75f285df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,6 +315,25 @@ ch_record(ChPid) -> store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C). +maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + txn = Txn, + unsent_message_count = UnsentMessageCount}) -> + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of + {0, 0, 0, none} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true + end. + +erase_ch_record(#cr{ch_pid = ChPid, + limiter_pid = LimiterPid, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(LimiterPid, self()), + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + all_ch_record() -> [C || {{ch, _}, C} <- get()]. @@ -361,7 +380,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - store_ch_record(NewC), + true = maybe_store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -380,7 +399,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - store_ch_record(C#cr{is_limit_active = true}), + true = maybe_store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -479,7 +498,7 @@ possibly_unblock(State, ChPid, Update) -> State; C -> NewC = Update(C), - store_ch_record(NewC), + maybe_store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -500,10 +519,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - acktags = ChAckTags} -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), + C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -562,7 +579,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, @@ -772,8 +789,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + true = maybe_store_ch_record( + C#cr{acktags = sets:add_element(AckTag, + ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, @@ -791,8 +809,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(LimiterPid, self()); _ -> ok @@ -826,12 +844,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> - store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); - _ -> ok - end, + C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid} -> + C1 = C#cr{consumer_count = ConsumerCount -1}, + maybe_store_ch_record( + case ConsumerCount of + 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), + C1#cr{limiter_pid = undefined}; + _ -> C1 + end), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -880,7 +901,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) end; @@ -904,7 +925,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, - store_ch_record(C1), + maybe_store_ch_record(C1), noreply(State#q{backing_queue_state = BQS1}) end; @@ -915,7 +936,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> BQS1 = BQ:ack(AckTags, BQS), diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 722573c7..b2997ae2 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -75,9 +75,12 @@ rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). --spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(), +-spec(map_exception/3 :: (rabbit_channel:channel_number(), + rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}). + {boolean(), + rabbit_channel:channel_number(), + rabbit_framing:amqp_method_record()}). -endif. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 1af213c4..9d1399f7 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -60,7 +60,7 @@ rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). --opaque(deletions() :: dict:dictionary()). +-opaque(deletions() :: dict()). -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). @@ -78,13 +78,13 @@ -spec(list_for_source_and_destination/2 :: (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> bindings()). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). --spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> - [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:binding()) -> rabbit_types:infos()). +-spec(info/2 :: (rabbit_types:binding(), rabbit_types:info_keys()) -> + rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: @@ -94,9 +94,9 @@ -spec(process_deletions/1 :: (deletions()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), - {'undefined' | rabbit_types:binding_source(), + {'undefined' | rabbit_types:exchange(), 'deleted' | 'not_deleted', - deletions()}, deletions()) -> deletions()). + bindings()}, deletions()) -> deletions()). -spec(new_deletions/0 :: () -> deletions()). -endif. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f4ff6253..b46bf1b4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -87,17 +87,17 @@ -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). --spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (pid()) -> [rabbit_types:info()]). --spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). --spec(info_all/0 :: () -> [[rabbit_types:info()]]). --spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). +-spec(info_all/0 :: () -> [rabbit_types:infos()]). +-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b..ff3995b5 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -52,21 +52,22 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, ChannelSupSupPid} = - supervisor2:start_child( - SupPid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, Collector} = supervisor2:start_child( SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,22 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6b212745..6c0a727b 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -346,8 +346,6 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); -format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 -> - Value; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl deleted file mode 100644 index a9806305..00000000 --- a/src/rabbit_dialyzer.erl +++ /dev/null @@ -1,92 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_dialyzer). - --export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, - halt_with_code/1]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(create_basic_plt/1 :: (file:filename()) -> 'ok'). --spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok'). --spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok'). --spec(halt_with_code/1 :: (atom()) -> no_return()). - --endif. - -%%---------------------------------------------------------------------------- - -create_basic_plt(BasicPltPath) -> - OptsRecord = dialyzer_options:build( - [{analysis_type, plt_build}, - {output_plt, BasicPltPath}, - {files_rec, otp_apps_dependencies_paths()}]), - dialyzer_cl:start(OptsRecord), - ok. - -add_to_plt(PltPath, FilesString) -> - Files = string:tokens(FilesString, " "), - DialyzerWarnings = dialyzer:run([{analysis_type, plt_add}, - {init_plt, PltPath}, - {output_plt, PltPath}, - {files, Files}]), - print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1), - ok. - -dialyze_files(PltPath, ModifiedFiles) -> - Files = string:tokens(ModifiedFiles, " "), - DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, - {files, Files}, - {warnings, [behaviours, - race_conditions]}]), - case DialyzerWarnings of - [] -> io:format("~nOk~n"); - _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n", - [length(DialyzerWarnings)]), - print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1) - end, - ok. - -print_warnings(Warnings, FormatFun) -> - [io:format("~s~n", [FormatFun(W)]) || W <- Warnings], - io:format("~n"). - -otp_apps_dependencies_paths() -> - [code:lib_dir(App, ebin) || - App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. - -halt_with_code(ok) -> - halt(); -halt_with_code(fail) -> - halt(1). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 76199073..3a7eaaad 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -69,14 +69,14 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: - (rabbit_types:exchange(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). --spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) - -> [[rabbit_types:info()]]). + (rabbit_types:exchange(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) + -> [rabbit_types:infos()]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). -spec(delete/2 :: diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl new file mode 100644 index 00000000..a0c8f4d5 --- /dev/null +++ b/src/rabbit_framing.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% TODO auto-generate + +-module(rabbit_framing). + +-ifdef(use_specs). + +-export_type([protocol/0, + amqp_field_type/0, amqp_property_type/0, + amqp_table/0, amqp_array/0, amqp_value/0, + amqp_method_name/0, amqp_method/0, amqp_method_record/0, + amqp_method_field_name/0, amqp_property_record/0, + amqp_exception/0, amqp_exception_code/0, amqp_class_id/0]). + +-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). + +-define(protocol_type(T), type(T :: rabbit_framing_amqp_0_8:T | + rabbit_framing_amqp_0_9_1:T)). + +-?protocol_type(amqp_field_type()). +-?protocol_type(amqp_property_type()). +-?protocol_type(amqp_table()). +-?protocol_type(amqp_array()). +-?protocol_type(amqp_value()). +-?protocol_type(amqp_method_name()). +-?protocol_type(amqp_method()). +-?protocol_type(amqp_method_record()). +-?protocol_type(amqp_method_field_name()). +-?protocol_type(amqp_property_record()). +-?protocol_type(amqp_exception()). +-?protocol_type(amqp_exception_code()). +-?protocol_type(amqp_class_id()). + +-endif. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1..589bf7cc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,16 +41,28 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). + +-type(heartbeat_callback() :: fun (() -> any())). + +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> + no_return())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). + + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,40 +70,60 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + ReceiveFun(), stop end}). -pause_monitor(none) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver} + end. + +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 671634c4..5a0532ea 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -52,7 +52,7 @@ -type(state() :: #iv_state { queue :: queue(), qname :: rabbit_amqqueue:name(), len :: non_neg_integer(), - pending_ack :: dict:dictionary() + pending_ack :: dict() }). -include("rabbit_backing_queue_spec.hrl"). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e5c30c06..0522afdc 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,6 +64,7 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). +-export([all_module_attributes/1, build_acyclic_graph/4]). -export([now_ms/0]). -import(mnesia). @@ -83,6 +84,12 @@ -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). +-type(digraph_label() :: term()). +-type(graph_vertex_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})). +-type(graph_edge_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})). +-type(graph_error_fun() :: fun ((any()) -> any() | no_return())). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -129,8 +136,8 @@ -spec(enable_cover/0 :: () -> ok_or_error()). -spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (file:filename()) -> ok_or_error()). --spec(report_cover/1 :: (file:filename()) -> 'ok'). +-spec(enable_cover/1 :: ([file:filename() | atom()]) -> ok_or_error()). +-spec(report_cover/1 :: ([file:filename() | atom()]) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -178,13 +185,15 @@ -spec(recursive_delete/1 :: ([file:filename()]) -> rabbit_types:ok_or_error({file:filename(), any()})). --spec(dict_cons/3 :: (any(), any(), dict:dictionary()) -> - dict:dictionary()). --spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) -> - orddict:dictionary()). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). +-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(), + graph_error_fun(), [{atom(), [term()]}]) -> + digraph()). -spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -270,29 +279,30 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). -enable_cover() -> - enable_cover("."). +enable_cover() -> enable_cover(["."]). -enable_cover([Root]) when is_atom(Root) -> - enable_cover(atom_to_list(Root)); -enable_cover(Root) -> - case cover:compile_beam_directory(filename:join(Root, "ebin")) of - {error,Reason} -> {error,Reason}; - _ -> ok - end. +enable_cover(Dirs) -> + lists:foldl(fun (Dir, ok) -> + case cover:compile_beam_directory( + filename:join(lists:concat([Dir]),"ebin")) of + {error, _} = Err -> Err; + _ -> ok + end; + (_Dir, Err) -> + Err + end, ok, Dirs). start_cover(NodesS) -> {ok, _} = cover:start([makenode(N) || N <- NodesS]), ok. -report_cover() -> - report_cover("."). +report_cover() -> report_cover(["."]). + +report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok. -report_cover([Root]) when is_atom(Root) -> - report_cover(atom_to_list(Root)); -report_cover(Root) -> +report_cover1(Root) -> Dir = filename:join(Root, "cover"), - ok = filelib:ensure_dir(filename:join(Dir,"junk")), + ok = filelib:ensure_dir(filename:join(Dir, "junk")), lists:foreach(fun (F) -> file:delete(F) end, filelib:wildcard(filename:join(Dir, "*.html"))), {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), @@ -726,3 +736,45 @@ get_flag(_, []) -> now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. + +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + +all_module_attributes(Name) -> + Modules = + lists:usort( + lists:append( + [Modules || {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun (Module, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{Module, Atts} | Acc] + end + end, [], Modules). + + +build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> + G = digraph:new([acyclic]), + [ case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ErrorFun({vertex, duplicate, Vertex}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts) ], + [ case digraph:add_edge(G, From, To) of + {error, E} -> ErrorFun({edge, E, From, To}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts) ], + G. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8de2f0d6..9d172269 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,9 +44,6 @@ -include("rabbit.hrl"). --define(SCHEMA_VERSION_SET, []). --define(SCHEMA_VERSION_FILENAME, "schema_version"). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -94,9 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = rabbit_misc:write_term_file(filename:join( - dir(), ?SCHEMA_VERSION_FILENAME), - [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -256,12 +250,12 @@ ensure_mnesia_dir() -> ensure_mnesia_running() -> case mnesia:system_info(is_running) of yes -> ok; - no -> throw({error, mnesia_not_running}) + no -> throw({error, mnesia_not_running}) end. ensure_mnesia_not_running() -> case mnesia:system_info(is_running) of - no -> ok; + no -> ok; yes -> throw({error, mnesia_unexpectedly_running}) end. @@ -378,28 +372,31 @@ init_db(ClusterNodes, Force) -> end; _ -> ok end, - case Nodes of - [] -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), - ok = create_schema() - end; - false -> - ok = create_schema() + case {Nodes, mnesia:system_info(use_dir), + mnesia:system_info(db_nodes)} of + {[], true, [_]} -> + %% True single disc node, attempt upgrade + wait_for_tables(), + case rabbit_upgrade:maybe_upgrade() of + ok -> + schema_ok_or_exit(); + version_not_available -> + schema_ok_or_move() end; - [_|_] -> + {[], true, _} -> + %% "Master" (i.e. without config) disc node in cluster, + %% verify schema + wait_for_tables(), + version_ok_or_exit(rabbit_upgrade:read_version()), + schema_ok_or_exit(); + {[], false, _} -> + %% First RAM node in cluster, start from scratch + ok = create_schema(); + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + version_ok_or_exit(rabbit_upgrade:read_version()), + version_ok_or_exit( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -408,7 +405,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ok = ensure_schema_integrity() + schema_ok_or_exit() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -418,6 +415,39 @@ init_db(ClusterNodes, Force) -> ClusterNodes, Reason}}) end. +schema_ok_or_move() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since it may not have been + %% started yet + error_logger:warning_msg("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end. + +version_ok_or_exit({ok, DiscVersion}) -> + case rabbit_upgrade:desired_version() of + DiscVersion -> + ok; + DesiredVersion -> + exit({schema_mismatch, DesiredVersion, DiscVersion}) + end; +version_ok_or_exit({error, _}) -> + ok = rabbit_upgrade:write_version(). + +schema_ok_or_exit() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + exit({schema_invalid, Reason}) + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -426,7 +456,8 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(). + ok = wait_for_tables(), + ok = rabbit_upgrade:write_version(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 682a7faa..fd84109b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -130,7 +130,7 @@ -type(client_msstate() :: #client_msstate { server :: server(), client_ref :: client_ref(), - file_handle_cache :: dict:dictionary(), + file_handle_cache :: dict(), index_state :: any(), index_module :: atom(), dir :: file:filename(), @@ -164,8 +164,8 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> - non_neg_integer()). --spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()). + 'ok'). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok'). -endif. diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 53d0d5cb..0940dce2 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -34,7 +34,7 @@ -export([async_recv/3, close/1, controlling_process/2, getstat/2, peername/1, peercert/1, port_command/2, - send/2, sockname/1]). + send/2, sockname/1, is_ssl/1]). %%--------------------------------------------------------------------------- @@ -65,6 +65,7 @@ -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). +-spec(is_ssl/1 :: (socket()) -> boolean()). -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). @@ -133,3 +134,6 @@ sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). + +is_ssl(Sock) -> + ?IS_SSL(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 03a9b386..1c542ffe 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -67,21 +67,21 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). --spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()]) +-spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos()) -> 'ok'). -spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(connections/0 :: () -> [rabbit_types:connection()]). --spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(connection_info/1 :: - (rabbit_types:connection()) -> [rabbit_types:info()]). + (rabbit_types:connection()) -> rabbit_types:infos()). -spec(connection_info/2 :: - (rabbit_types:connection(), [rabbit_types:info_key()]) - -> [rabbit_types:info()]). --spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]). + (rabbit_types:connection(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(connection_info_all/0 :: () -> [rabbit_types:infos()]). -spec(connection_info_all/1 :: - ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). + (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/3 :: diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 28d0b47d..bde9b3d3 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -190,7 +190,7 @@ unacked :: non_neg_integer() })). -type(seq_id() :: integer()). --type(seg_dict() :: {dict:dictionary(), [segment()]}). +-type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 127467bb..12730ccf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -65,7 +65,7 @@ -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). --define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, +-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, protocol, user, vhost, timeout, frame_max, @@ -162,28 +162,25 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_networking:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). --spec(info_keys/0 :: () -> [rabbit_types:info_key()]). --spec(info/1 :: (pid()) -> [rabbit_types:info()]). --spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), - rabbit_networking:socket(), - fun ((rabbit_networking:socket()) -> + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), + rabbit_net:socket(), + fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( - rabbit_networking:socket(), any()))) -> no_return()). + rabbit_net:socket(), any()))) -> no_return()). -endif. @@ -771,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -839,6 +848,8 @@ i(peer_address, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock); i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); +i(ssl, #v1{sock = Sock}) -> + rabbit_net:is_ssl(Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -889,7 +900,7 @@ cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; {error, no_peercert} -> ''; - {ok, Cert} -> F(Cert) + {ok, Cert} -> list_to_binary(F(Cert)) end. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 4335dd2e..5b905682 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -85,8 +85,8 @@ peer_cert_validity(Cert) -> cert_info(F, Cert) -> F(case public_key:pkix_decode_cert(Cert, otp) of - {ok, DecCert} -> DecCert; - DecCert -> DecCert + {ok, DecCert} -> DecCert; %%pre R14B + DecCert -> DecCert %%R14B onwards end). %%-------------------------------------------------------------------------- diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index edc73f70..b9993823 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -35,7 +35,8 @@ -ifdef(use_specs). --export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, +-export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0, + message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, message_properties/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, @@ -95,7 +96,10 @@ -type(txn() :: rabbit_guid:guid()). -type(info_key() :: atom()). +-type(info_keys() :: [info_key()]). + -type(info() :: {info_key(), any()}). +-type(infos() :: [info()]). -type(amqp_error() :: #amqp_error{name :: rabbit_framing:amqp_exception(), @@ -143,7 +147,7 @@ -type(connection() :: pid()). --type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). +-type(protocol() :: rabbit_framing:protocol()). -type(user() :: #user{username :: rabbit_access_control:username(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl new file mode 100644 index 00000000..0071a08a --- /dev/null +++ b/src/rabbit_upgrade.erl @@ -0,0 +1,156 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_upgrade). + +-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). + +-include("rabbit.hrl"). + +-define(VERSION_FILENAME, "schema_version"). +-define(LOCK_FILENAME, "schema_upgrade_lock"). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(read_version/0 :: + () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_version/0 :: () -> 'ok'). +-spec(desired_version/0 :: () -> [atom()]). + +-endif. + +%% ------------------------------------------------------------------- + +%% Try to upgrade the schema. If no information on the existing schema +%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() +%% will catch the problem. +maybe_upgrade() -> + case read_version() of + {ok, CurrentHeads} -> + G = load_graph(), + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> + exit({future_upgrades_found, Unknown}) + end, + true = digraph:delete(G), + ok; + {error, enoent} -> + version_not_available + end. + +read_version() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [Heads]} -> {ok, Heads}; + {error, E} -> {error, E} + end. + +write_version() -> + ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), + ok. + +desired_version() -> + G = load_graph(), + Version = heads(G), + true = digraph:delete(G), + Version. + +%% ------------------------------------------------------------------- + +load_graph() -> + Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), + rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). + +vertices(Module, Steps) -> + [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + +edges(_Module, Steps) -> + [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. + +graph_build_error({vertex, duplicate, StepName}) -> + exit({duplicate_upgrade, StepName}); +graph_build_error({edge, E, From, To}) -> + exit({E, From, To}). + +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) + || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +apply_upgrades(Upgrades) -> + LockFile = lock_filename(), + case file:open(LockFile, [write, exclusive]) of + {ok, Lock} -> + ok = file:close(Lock), + info("Upgrades: ~w to apply~n", [length(Upgrades)]), + [apply_upgrade(Upgrade) || Upgrade <- Upgrades], + info("Upgrades: All applied~n", []), + ok = write_version(), + ok = file:delete(LockFile); + {error, eexist} -> + exit(previous_upgrade_failed); + {error, _} = Error -> + exit(Error) + end. + +apply_upgrade({M, F}) -> + info("Upgrades: Applying ~w:~w~n", [M, F]), + ok = apply(M, F, []). + +%% ------------------------------------------------------------------- + +schema_filename() -> + filename:join(dir(), ?VERSION_FILENAME). + +lock_filename() -> + filename:join(dir(), ?LOCK_FILENAME). + +%% NB: we cannot use rabbit_log here since it may not have been +%% started yet +info(Msg, Args) -> + error_logger:info_msg(Msg, Args). + +dir() -> + rabbit_mnesia:dir(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl new file mode 100644 index 00000000..2ba885d0 --- /dev/null +++ b/src/rabbit_upgrade_functions.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-module(rabbit_upgrade_functions). + +-include("rabbit.hrl"). + +-compile([export_all]). + +-rabbit_upgrade({remove_user_scope, []}). +-rabbit_upgrade({hash_passwords, []}). +-rabbit_upgrade({add_ip_to_listener, []}). +-rabbit_upgrade({add_internal_to_exchange, []}). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(remove_user_scope/0 :: () -> 'ok'). +-spec(hash_passwords/0 :: () -> 'ok'). +-spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(add_internal_to_exchange/0 :: () -> 'ok'). + +-endif. + +%%-------------------------------------------------------------------- + +%% It's a bad idea to use records or record_info here, even for the +%% destination form. Because in the future, the destination form of +%% your current transform may not match the record any more, and it +%% would be messy to have to go back and fix old transforms at that +%% point. + +remove_user_scope() -> + mnesia( + rabbit_user_permission, + fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> + {user_permission, UV, {permission, Conf, Write, Read}} + end, + [user_vhost, permission]). + +hash_passwords() -> + mnesia( + rabbit_user, + fun ({user, Username, Password, IsAdmin}) -> + Hash = rabbit_access_control:hash_password(Password), + {user, Username, Hash, IsAdmin} + end, + [username, password_hash, is_admin]). + +add_ip_to_listener() -> + mnesia( + rabbit_listener, + fun ({listener, Node, Protocol, Host, Port}) -> + {listener, Node, Protocol, Host, {0,0,0,0}, Port} + end, + [node, protocol, host, ip_address, port]). + +add_internal_to_exchange() -> + mnesia( + rabbit_exchange, + fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> + {exchange, Name, Type, Durable, AutoDelete, false, Args} + end, + [name, type, durable, auto_delete, internal, arguments]). + +%%-------------------------------------------------------------------- + +mnesia(TableName, Fun, FieldList) -> + {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f88e49c2..69d62fde 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -305,7 +305,7 @@ q3 :: bpqueue:bpqueue(), q4 :: queue(), next_seq_id :: seq_id(), - pending_ack :: dict:dictionary(), + pending_ack :: dict(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index aa986e54..50bca390 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -62,9 +62,10 @@ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). -spec(send_command_sync/2 :: - (pid(), rabbit_framing:amqp_method()) -> 'ok'). + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command_sync/3 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 93adfcb1..46bab31d 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -357,8 +357,7 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), {noreply, NState}; -handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) - when not (?is_simple(State)) -> +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of {value, Child} -> {ok, NState} = do_restart(RestartType, Reason, Child, State), |