summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-06-06 11:23:53 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-06-06 11:23:53 +0100
commita6c04df9b55944ec8f947d77697acf31dfd3626a (patch)
treed00327545097c2527ec5a7c630b225c2ac7708e7
parent0a4c43d10e88004e3912a91ee7be8efa8e50edc7 (diff)
parent4bfbabac01f9f790d91f241e5db6fcdc03fbe8e7 (diff)
downloadrabbitmq-server-a6c04df9b55944ec8f947d77697acf31dfd3626a.tar.gz
Merge bug24128
-rw-r--r--Makefile10
-rw-r--r--docs/examples-to-end.xsl4
-rw-r--r--docs/usage.xsl4
-rw-r--r--generate_app18
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/gm_soak_test.erl8
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_binding.erl35
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_exchange.erl21
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_upgrade_functions.erl14
-rw-r--r--src/rabbit_variable_queue.erl6
17 files changed, 128 insertions, 79 deletions
diff --git a/Makefile b/Makefile
index e376b4ac..a347689b 100644
--- a/Makefile
+++ b/Makefile
@@ -93,8 +93,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES)
rm -f $@
echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR)
-$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
- escript generate_app $(EBIN_DIR) $@ < $<
+$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(SOURCES) generate_app
+ escript generate_app $< $@ $(SOURCE_DIR)
$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl | $(DEPS_FILE)
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
@@ -233,7 +233,7 @@ distclean: clean
# xmlto can not read from standard input, so we mess with a tmp file.
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
- xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xsltproc --novalid $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \
gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
@@ -242,7 +242,7 @@ distclean: clean
# Do not fold the cp into previous line, it's there to stop the file being
# generated but empty if we fail
$(SOURCE_DIR)/%_usage.erl:
- xsltproc --stringparam modulename "`basename $@ .erl`" \
+ xsltproc --novalid --stringparam modulename "`basename $@ .erl`" \
$(DOCS_DIR)/usage.xsl $< > $@.tmp
sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2
fold -s $@.tmp2 > $@.tmp3
@@ -256,7 +256,7 @@ $(SOURCE_DIR)/%_usage.erl:
xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml
cat `basename $< .xml`.html | \
xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \
- xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \
+ xsltproc --novalid --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \
xmllint --format - > $@
rm `basename $< .xml`.html
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index d9686ada..a0a74178 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -1,9 +1,5 @@
<?xml version='1.0'?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
- xmlns:exsl="http://exslt.org/common"
- xmlns:ng="http://docbook.org/docbook-ng"
- xmlns:db="http://docbook.org/ns/docbook"
- exclude-result-prefixes="exsl ng db"
version='1.0'>
<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" />
diff --git a/docs/usage.xsl b/docs/usage.xsl
index a6cebd93..586f8303 100644
--- a/docs/usage.xsl
+++ b/docs/usage.xsl
@@ -1,9 +1,5 @@
<?xml version='1.0'?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
- xmlns:exsl="http://exslt.org/common"
- xmlns:ng="http://docbook.org/docbook-ng"
- xmlns:db="http://docbook.org/ns/docbook"
- exclude-result-prefixes="exsl"
version='1.0'>
<xsl:param name="modulename"/>
diff --git a/generate_app b/generate_app
index 576b485e..fb0eb1ea 100644
--- a/generate_app
+++ b/generate_app
@@ -1,12 +1,16 @@
#!/usr/bin/env escript
%% -*- erlang -*-
-main([BeamDir, TargetFile]) ->
- Modules = [list_to_atom(filename:basename(F, ".beam")) ||
- F <- filelib:wildcard("*.beam", BeamDir)],
- {ok, {application, Application, Properties}} = io:read(''),
- NewProperties = lists:keyreplace(modules, 1, Properties,
- {modules, Modules}),
+main([InFile, OutFile | SrcDirs]) ->
+ Modules = [list_to_atom(filename:basename(F, ".erl")) ||
+ SrcDir <- SrcDirs,
+ F <- filelib:wildcard("*.erl", SrcDir)],
+ {ok, [{application, Application, Properties}]} = file:consult(InFile),
+ NewProperties =
+ case proplists:get_value(modules, Properties) of
+ [] -> lists:keyreplace(modules, 1, Properties, {modules, Modules});
+ _ -> Properties
+ end,
file:write_file(
- TargetFile,
+ OutFile,
io_lib:format("~p.~n", [{application, Application, NewProperties}])).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e2..295d9039 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index dae42ac7..5e5a3a5a 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) ->
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
exit({{from, From},
- {duplicate_delivery_of, Num1},
- {expecting, Num}});
+ {duplicate_delivery_of, Num},
+ {expecting, Num1}});
{ok, Num1} ->
exit({{from, From},
- {missing_delivery_of, Num},
- {received_early, Num1}});
+ {received_early, Num},
+ {expecting, Num1}});
error ->
exit({{from, From},
{received_premature_delivery, Num}})
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f9ed3edc..c8703740 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,12 +197,12 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -487,11 +487,11 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8091e2c2..07a24af8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -97,12 +97,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,16 +114,16 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
@@ -226,6 +225,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
+backing_queue_module(#amqqueue{}) ->
+ {ok, BQM} = application:get_env(backing_queue_module),
+ BQM.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc5..217ad3eb 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 2f71bfab..5873537c 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -198,22 +198,33 @@ list(VHostPath) ->
Route)].
list_for_source(SrcName) ->
- Route = #route{binding = #binding{source = SrcName, _ = '_'}},
- [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
- Route)].
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{source = SrcName, _ = '_'}},
+ [B || #route{binding = B}
+ <- mnesia:match_object(rabbit_route, Route, read)]
+ end).
list_for_destination(DstName) ->
- Route = #route{binding = #binding{destination = DstName, _ = '_'}},
- [reverse_binding(B) || #reverse_route{reverse_binding = B} <-
- mnesia:dirty_match_object(rabbit_reverse_route,
- reverse_route(Route))].
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{destination = DstName,
+ _ = '_'}},
+ [reverse_binding(B) ||
+ #reverse_route{reverse_binding = B} <-
+ mnesia:match_object(rabbit_reverse_route,
+ reverse_route(Route), read)]
+ end).
list_for_source_and_destination(SrcName, DstName) ->
- Route = #route{binding = #binding{source = SrcName,
- destination = DstName,
- _ = '_'}},
- [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
- Route)].
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:match_object(rabbit_route,
+ Route, read)]
+ end).
info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index be1c08d8..1fef76ee 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -412,6 +412,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 84a44cd2..cab1b99f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -24,7 +24,7 @@
info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1]).
+-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
%%----------------------------------------------------------------------------
@@ -75,7 +75,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
@@ -93,7 +94,7 @@ recover() ->
true -> store(X);
false -> ok
end,
- rabbit_exchange:callback(X, create, [Tx, X])
+ rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
end,
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
@@ -127,10 +128,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = XT:create(case Tx of
- true -> transaction;
- false -> none
- end, Exchange),
+ ok = XT:create(map_create_tx(Tx), Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -139,6 +137,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
Err
end).
+map_create_tx(true) -> transaction;
+map_create_tx(false) -> none.
+
store(X = #exchange{name = Name, type = Type}) ->
ok = mnesia:write(rabbit_exchange, X, write),
case (type_to_module(Type)):serialise_events() of
@@ -330,6 +331,12 @@ next_serial(XName) ->
#exchange_serial{name = XName, next = Serial + 1}, write),
Serial.
+peek_serial(XName) ->
+ case mnesia:read({rabbit_exchange_serial, XName}) of
+ [#exchange_serial{next = Serial}] -> Serial;
+ _ -> undefined
+ end.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
{ok, Module} = rabbit_registry:lookup_module(exchange, T),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 53171e87..b6b97f6d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -25,7 +25,7 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/2]).
+-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -56,6 +56,7 @@
-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
+-export([pget/2, pget/3, pget_or_die/2]).
%%----------------------------------------------------------------------------
@@ -104,6 +105,11 @@
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
-> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+-spec(set_table_value/4 ::
+ (rabbit_framing:amqp_table(), binary(),
+ rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
+ -> rabbit_framing:amqp_table()).
+
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -196,6 +202,9 @@
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
-spec(is_process_alive/1 :: (pid()) -> boolean()).
+-spec(pget/2 :: (term(), [term()]) -> term()).
+-spec(pget/3 :: (term(), [term()], term()) -> term()).
+-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-endif.
@@ -268,6 +277,10 @@ table_lookup(Table, Key) ->
false -> undefined
end.
+set_table_value(Table, Key, Type, Value) ->
+ sort_field_table(
+ lists:keystore(Key, 1, Table, {Key, Type, Value})).
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
@@ -897,3 +910,12 @@ is_process_alive(Pid) ->
true -> true;
_ -> false
end.
+
+pget(K, P) -> proplists:get_value(K, P).
+pget(K, P, D) -> proplists:get_value(K, P, D).
+
+pget_or_die(K, P) ->
+ case proplists:get_value(K, P) of
+ undefined -> exit({error, key_missing, K});
+ V -> V
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 2df76d4e..568b9ce6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -92,6 +92,10 @@ init() ->
ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true,
fun maybe_upgrade_local_or_record_desired/0),
+ %% We intuitively expect the global name server to be synced when
+ %% Mnesia is up. In fact that's not guaranteed to be the case - let's
+ %% make it so.
+ ok = global:sync(),
ok.
is_db_empty() ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdff..3f4aa54e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index bead388d..5e4a1224 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -34,15 +34,15 @@
-ifdef(use_specs).
--spec(remove_user_scope/0 :: () -> 'ok').
--spec(hash_passwords/0 :: () -> 'ok').
--spec(add_ip_to_listener/0 :: () -> 'ok').
--spec(internal_exchanges/0 :: () -> 'ok').
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(hash_passwords/0 :: () -> 'ok').
+-spec(add_ip_to_listener/0 :: () -> 'ok').
+-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
--spec(topic_trie/0 :: () -> 'ok').
+-spec(topic_trie/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
-spec(exchange_event_serial/0 :: () -> 'ok').
--spec(semi_durable_route/0 :: () -> 'ok').
--spec(trace_exchanges/0 :: () -> 'ok').
+-spec(trace_exchanges/0 :: () -> 'ok').
-endif.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43..a167cca0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.