diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-06-06 11:23:53 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-06-06 11:23:53 +0100 |
commit | a6c04df9b55944ec8f947d77697acf31dfd3626a (patch) | |
tree | d00327545097c2527ec5a7c630b225c2ac7708e7 | |
parent | 0a4c43d10e88004e3912a91ee7be8efa8e50edc7 (diff) | |
parent | 4bfbabac01f9f790d91f241e5db6fcdc03fbe8e7 (diff) | |
download | rabbitmq-server-a6c04df9b55944ec8f947d77697acf31dfd3626a.tar.gz |
Merge bug24128
-rw-r--r-- | Makefile | 10 | ||||
-rw-r--r-- | docs/examples-to-end.xsl | 4 | ||||
-rw-r--r-- | docs/usage.xsl | 4 | ||||
-rw-r--r-- | generate_app | 18 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 4 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 35 | ||||
-rw-r--r-- | src/rabbit_control.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 21 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 24 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
17 files changed, 128 insertions, 79 deletions
@@ -93,8 +93,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES) rm -f $@ echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR) -$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app - escript generate_app $(EBIN_DIR) $@ < $< +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(SOURCES) generate_app + escript generate_app $< $@ $(SOURCE_DIR) $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE) erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< @@ -233,7 +233,7 @@ distclean: clean # xmlto can not read from standard input, so we mess with a tmp file. %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ - xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xsltproc --novalid $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \ gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp @@ -242,7 +242,7 @@ distclean: clean # Do not fold the cp into previous line, it's there to stop the file being # generated but empty if we fail $(SOURCE_DIR)/%_usage.erl: - xsltproc --stringparam modulename "`basename $@ .erl`" \ + xsltproc --novalid --stringparam modulename "`basename $@ .erl`" \ $(DOCS_DIR)/usage.xsl $< > $@.tmp sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2 fold -s $@.tmp2 > $@.tmp3 @@ -256,7 +256,7 @@ $(SOURCE_DIR)/%_usage.erl: xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml cat `basename $< .xml`.html | \ xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \ - xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ + xsltproc --novalid --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ xmllint --format - > $@ rm `basename $< .xml`.html diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index d9686ada..a0a74178 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -1,9 +1,5 @@ <?xml version='1.0'?> <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" - xmlns:exsl="http://exslt.org/common" - xmlns:ng="http://docbook.org/docbook-ng" - xmlns:db="http://docbook.org/ns/docbook" - exclude-result-prefixes="exsl ng db" version='1.0'> <xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> diff --git a/docs/usage.xsl b/docs/usage.xsl index a6cebd93..586f8303 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -1,9 +1,5 @@ <?xml version='1.0'?> <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" - xmlns:exsl="http://exslt.org/common" - xmlns:ng="http://docbook.org/docbook-ng" - xmlns:db="http://docbook.org/ns/docbook" - exclude-result-prefixes="exsl" version='1.0'> <xsl:param name="modulename"/> diff --git a/generate_app b/generate_app index 576b485e..fb0eb1ea 100644 --- a/generate_app +++ b/generate_app @@ -1,12 +1,16 @@ #!/usr/bin/env escript %% -*- erlang -*- -main([BeamDir, TargetFile]) -> - Modules = [list_to_atom(filename:basename(F, ".beam")) || - F <- filelib:wildcard("*.beam", BeamDir)], - {ok, {application, Application, Properties}} = io:read(''), - NewProperties = lists:keyreplace(modules, 1, Properties, - {modules, Modules}), +main([InFile, OutFile | SrcDirs]) -> + Modules = [list_to_atom(filename:basename(F, ".erl")) || + SrcDir <- SrcDirs, + F <- filelib:wildcard("*.erl", SrcDir)], + {ok, [{application, Application, Properties}]} = file:consult(InFile), + NewProperties = + case proplists:get_value(modules, Properties) of + [] -> lists:keyreplace(modules, 1, Properties, {modules, Modules}); + _ -> Properties + end, file:write_file( - TargetFile, + OutFile, io_lib:format("~p.~n", [{application, Application, NewProperties}])). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 1c2b94e2..295d9039 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -32,8 +32,8 @@ -spec(stop/0 :: () -> 'ok'). -spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), async_callback(), sync_callback()) -> state()). --spec(terminate/1 :: (state()) -> state()). --spec(delete_and_terminate/1 :: (state()) -> state()). +-spec(terminate/2 :: (any(), state()) -> state()). +-spec(delete_and_terminate/2 :: (any(), state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -spec(publish/4 :: (rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) -> diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index dae42ac7..5e5a3a5a 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) -> {ok, Num} -> ok; {ok, Num1} when Num < Num1 -> exit({{from, From}, - {duplicate_delivery_of, Num1}, - {expecting, Num}}); + {duplicate_delivery_of, Num}, + {expecting, Num1}}); {ok, Num1} -> exit({{from, From}, - {missing_delivery_of, Num}, - {received_early, Num1}}); + {received_early, Num}, + {expecting, Num1}}); error -> exit({{from, From}, {received_premature_delivery, Num}}) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f9ed3edc..c8703740 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,12 +197,12 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -487,11 +487,11 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8091e2c2..07a24af8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -97,12 +97,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -115,16 +114,16 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(shutdown, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); -terminate(_Reason, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate(Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( queue_deleted, [{pid, self()}]), - BQS1 = BQ:delete_and_terminate(BQS), + BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. rabbit_amqqueue:internal_delete(qname(State)), @@ -226,6 +225,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. +backing_queue_module(#amqqueue{}) -> + {ok, BQM} = application:get_env(backing_queue_module), + BQM. + ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index addaabc5..217ad3eb 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -49,11 +49,11 @@ behaviour_info(callbacks) -> {init, 4}, %% Called on queue shutdown when queue isn't being deleted. - {terminate, 1}, + {terminate, 2}, %% Called when the queue is terminating and needs to delete all %% its content. - {delete_and_terminate, 1}, + {delete_and_terminate, 2}, %% Remove all messages in the queue, but not messages which have %% been fetched and are pending acks. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 2f71bfab..5873537c 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -198,22 +198,33 @@ list(VHostPath) -> Route)]. list_for_source(SrcName) -> - Route = #route{binding = #binding{source = SrcName, _ = '_'}}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, + [B || #route{binding = B} + <- mnesia:match_object(rabbit_route, Route, read)] + end). list_for_destination(DstName) -> - Route = #route{binding = #binding{destination = DstName, _ = '_'}}, - [reverse_binding(B) || #reverse_route{reverse_binding = B} <- - mnesia:dirty_match_object(rabbit_reverse_route, - reverse_route(Route))]. + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{destination = DstName, + _ = '_'}}, + [reverse_binding(B) || + #reverse_route{reverse_binding = B} <- + mnesia:match_object(rabbit_reverse_route, + reverse_route(Route), read)] + end). list_for_source_and_destination(SrcName, DstName) -> - Route = #route{binding = #binding{source = SrcName, - destination = DstName, - _ = '_'}}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:match_object(rabbit_route, + Route, read)] + end). info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index be1c08d8..1fef76ee 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -412,6 +412,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 84a44cd2..cab1b99f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1]). +-export([maybe_auto_delete/1, serial/1, peek_serial/1]). %%---------------------------------------------------------------------------- @@ -75,7 +75,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined'). -endif. @@ -93,7 +94,7 @@ recover() -> true -> store(X); false -> ok end, - rabbit_exchange:callback(X, create, [Tx, X]) + rabbit_exchange:callback(X, create, [map_create_tx(Tx), X]) end, rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. @@ -127,10 +128,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = XT:create(case Tx of - true -> transaction; - false -> none - end, Exchange), + ok = XT:create(map_create_tx(Tx), Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -139,6 +137,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). +map_create_tx(true) -> transaction; +map_create_tx(false) -> none. + store(X = #exchange{name = Name, type = Type}) -> ok = mnesia:write(rabbit_exchange, X, write), case (type_to_module(Type)):serialise_events() of @@ -330,6 +331,12 @@ next_serial(XName) -> #exchange_serial{name = XName, next = Serial + 1}, write), Serial. +peek_serial(XName) -> + case mnesia:read({rabbit_exchange_serial, XName}) of + [#exchange_serial{next = Serial}] -> Serial; + _ -> undefined + end. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 53171e87..b6b97f6d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -25,7 +25,7 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/2]). +-export([table_lookup/2, set_table_value/4]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -56,6 +56,7 @@ -export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). +-export([pget/2, pget/3, pget_or_die/2]). %%---------------------------------------------------------------------------- @@ -104,6 +105,11 @@ -spec(table_lookup/2 :: (rabbit_framing:amqp_table(), binary()) -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). +-spec(set_table_value/4 :: + (rabbit_framing:amqp_table(), binary(), + rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) + -> rabbit_framing:amqp_table()). + -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -196,6 +202,9 @@ -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). -spec(is_process_alive/1 :: (pid()) -> boolean()). +-spec(pget/2 :: (term(), [term()]) -> term()). +-spec(pget/3 :: (term(), [term()], term()) -> term()). +-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -endif. @@ -268,6 +277,10 @@ table_lookup(Table, Key) -> false -> undefined end. +set_table_value(Table, Key, Type, Value) -> + sort_field_table( + lists:keystore(Key, 1, Table, {Key, Type, Value})). + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; @@ -897,3 +910,12 @@ is_process_alive(Pid) -> true -> true; _ -> false end. + +pget(K, P) -> proplists:get_value(K, P). +pget(K, P, D) -> proplists:get_value(K, P, D). + +pget_or_die(K, P) -> + case proplists:get_value(K, P) of + undefined -> exit({error, key_missing, K}); + V -> V + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 2df76d4e..568b9ce6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -92,6 +92,10 @@ init() -> ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true, fun maybe_upgrade_local_or_record_desired/0), + %% We intuitively expect the global name server to be synced when + %% Mnesia is up. In fact that's not guaranteed to be the case - let's + %% make it so. + ok = global:sync(), ok. is_db_empty() -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1a37cdff..3f4aa54e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) -> {delta, {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)), + _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. test_variable_queue() -> @@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {_Guids, VQ4} = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), - _VQ6 = rabbit_variable_queue:terminate(VQ5), + _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2336,7 +2336,7 @@ test_queue_recover() -> VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), + _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index bead388d..5e4a1224 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -34,15 +34,15 @@ -ifdef(use_specs). --spec(remove_user_scope/0 :: () -> 'ok'). --spec(hash_passwords/0 :: () -> 'ok'). --spec(add_ip_to_listener/0 :: () -> 'ok'). --spec(internal_exchanges/0 :: () -> 'ok'). +-spec(remove_user_scope/0 :: () -> 'ok'). +-spec(hash_passwords/0 :: () -> 'ok'). +-spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). --spec(topic_trie/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). -spec(exchange_event_serial/0 :: () -> 'ok'). --spec(semi_durable_route/0 :: () -> 'ok'). --spec(trace_exchanges/0 :: () -> 'ok'). +-spec(trace_exchanges/0 :: () -> 'ok'). -endif. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8ac3ad43..a167cca0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/4, terminate/1, delete_and_terminate/1, +-export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). -terminate(State) -> +terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -473,7 +473,7 @@ terminate(State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(State) -> +delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do %% as part of 'purge' and 'remove_pending_ack', other than %% deleting it. |