diff options
authorSimon MacMullen <>2010-11-19 15:48:29 +0000
committerSimon MacMullen <>2010-11-19 15:48:29 +0000
commit88913f8e9bc1145898f1e50f1315675e6b215e72 (patch)
parente43f214f918478b382d79f050dbdc42a469a001d (diff)
parent5f7e24198de005dd24ef20f73299a6848c15dcde (diff)
Merged default into bug23467
14 files changed, 337 insertions, 212 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 64c75102..e6657b32 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -291,11 +291,10 @@ run_boot_step({StepName, Attributes}) ->
io:format("-- ~s~n", [Description]);
MFAs ->
io:format("starting ~-60s ...", [Description]),
- [case catch apply(M,F,A) of
- {'EXIT', Reason} ->
- boot_error("FAILED~nReason: ~p~n", [Reason]);
- ok ->
- ok
+ [try
+ apply(M,F,A)
+ catch
+ _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason])
end || {M,F,A} <- MFAs],
@@ -315,43 +314,43 @@ edges(_Module, Steps) ->
{Key, OtherStep} <- Atts,
Key =:= requires orelse Key =:= enables].
-graph_build_error({vertex, duplicate, StepName}) ->
- boot_error("Duplicate boot step name: ~w~n", [StepName]);
-graph_build_error({edge, Reason, From, To}) ->
- boot_error(
- "Could not add boot step dependency of ~w on ~w:~n~s",
- [To, From,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next]) || Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end]).
sort_boot_steps(UnsortedSteps) ->
- G = rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps),
- %% Use topological sort to find a consistent ordering (if there is
- %% one, otherwise fail).
- SortedStepsRev = [begin
- {StepName, Step} = digraph:vertex(G, StepName),
- Step
- end || StepName <- digraph_utils:topsort(G)],
- SortedSteps = lists:reverse(SortedStepsRev),
- digraph:delete(G),
- %% Check that all mentioned {M,F,A} triples are exported.
- case [{StepName, {M,F,A}}
- || {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
- not erlang:function_exported(M, F, length(A))] of
- [] -> SortedSteps;
- MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
- [MissingFunctions])
+ case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error(
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end])
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a983300a..2dc8f172 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -37,7 +37,7 @@
check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1]).
+-export([change_password_hash/2, hash_password/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, clear_permissions/2,
list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
@@ -71,6 +71,7 @@
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
+-spec(hash_password/1 :: (password()) -> password_hash()).
-spec(set_admin/1 :: (username()) -> 'ok').
-spec(clear_admin/1 :: (username()) -> 'ok').
-spec(list_users/0 :: () -> [{username(), boolean()}]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 75f285df..a999fe58 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -669,7 +669,10 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
+ emit_stats(State, []).
+emit_stats(State, Extra) ->
+ rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
@@ -1053,7 +1056,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ emit_stats(State, [{idle_since, now()}])
+ end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 19613a57..800cc237 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -267,9 +267,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer, fun () ->
- internal_emit_stats(State)
- end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ internal_emit_stats(
+ State, [{idle_since, now()}])
+ end),
State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
@@ -1201,11 +1203,14 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+internal_emit_stats(State) ->
+ internal_emit_stats(State, []).
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
- rabbit_event:notify(channel_stats, CoarseStats);
+ rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
fine ->
FineStats =
@@ -1215,7 +1220,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ rabbit_event:notify(channel_stats,
+ Extra ++ CoarseStats ++ FineStats)
erase_queue_stats(QPid) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6c0a727b..72b77b1f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -94,9 +94,7 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(
- " ", [atom_to_list(Command) | Args]))]),
+ [string:join([atom_to_list(Command) | Args], " ")]),
{error, Reason} ->
print_error("~p", [Reason]),
@@ -321,7 +319,7 @@ display_info_list(Other, _) ->
display_row(Row) ->
- io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
+ io:fwrite(string:join(Row, "\t")),
-define(IS_U8(X), (X >= 0 andalso X =< 255)).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index f41b76d8..9cc70a26 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X = #exchange{name = XName}, Delivery) ->
- route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ route(Delivery, {queue:from_list([X]), XName, []}),
route(Delivery, {WorkList, SeenXs, QNames}) ->
@@ -252,13 +252,22 @@ process_alternate(_X, Results) ->
process_route(#resource{kind = exchange} = XName,
+ {_WorkList, XName, _QNames} = Acc) ->
+ Acc;
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
+ {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, gb_sets:from_list([SeenX, XName]), QNames};
+process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
- case sets:is_element(XName, SeenXs) of
+ case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
false -> {case lookup(XName) of
{ok, X} -> queue:in(X, WorkList);
{error, not_found} -> WorkList
- end, sets:add_element(XName, SeenXs), QNames}
+ end, gb_sets:add_element(XName, SeenXs), QNames}
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0522afdc..230f4db5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,7 +50,7 @@
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
--export([intersperse/2, upmap/2, map_in_order/2]).
+-export([upmap/2, map_in_order/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
@@ -64,7 +64,7 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
--export([all_module_attributes/1, build_acyclic_graph/4]).
+-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -86,10 +86,9 @@
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
-type(graph_vertex_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
- fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})).
--type(graph_error_fun() :: fun ((any()) -> any() | no_return())).
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -155,7 +154,6 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
--spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
@@ -191,9 +189,13 @@
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
--spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(),
- graph_error_fun(), [{atom(), [term()]}]) ->
- digraph()).
+-spec(build_acyclic_graph/3 ::
+ (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
+ -> rabbit_types:ok_or_error2(digraph(),
+ {'vertex', 'duplicate', digraph:vertex()} |
+ {'edge', ({bad_vertex, digraph:vertex()} |
+ {bad_edge, [digraph:vertex()]}),
+ digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
@@ -414,10 +416,6 @@ tcp_name(Prefix, IPAddress, Port)
[Prefix, inet_parse:ntoa(IPAddress), Port]))).
-intersperse(_, []) -> [];
-intersperse(_, [E]) -> [E];
-intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
%% This is a modified version of Luke Gorrie's pmap -
%% - that doesn't care about
%% the order in which results are received.
@@ -765,16 +763,21 @@ all_module_attributes(Name) ->
end, [], Modules).
-build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) ->
+build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
- [ case digraph:vertex(G, Vertex) of
- false -> digraph:add_vertex(G, Vertex, Label);
- _ -> ErrorFun({vertex, duplicate, Vertex})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts) ],
- [ case digraph:add_edge(G, From, To) of
- {error, E} -> ErrorFun({edge, E, From, To});
- _ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts) ],
- G.
+ try
+ [case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts)],
+ [case digraph:add_edge(G, From, To) of
+ {error, E} -> throw({graph_error, {edge, E, From, To}});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts)],
+ {ok, G}
+ catch {graph_error, Reason} ->
+ true = digraph:delete(G),
+ {error, Reason}
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 9d172269..cb3251c7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -361,16 +361,15 @@ init_db(ClusterNodes, Force) ->
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
{ok, Nodes} ->
case Force of
- false ->
- FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ ->
- throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect to some nodes."}})
- end;
- _ -> ok
+ false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ -> throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect "
+ "to some nodes."}})
+ end;
+ true -> ok
case {Nodes, mnesia:system_info(use_dir),
mnesia:system_info(db_nodes)} of
@@ -379,7 +378,7 @@ init_db(ClusterNodes, Force) ->
case rabbit_upgrade:maybe_upgrade() of
ok ->
- schema_ok_or_exit();
+ ensure_schema_ok();
version_not_available ->
@@ -387,15 +386,15 @@ init_db(ClusterNodes, Force) ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
- version_ok_or_exit(rabbit_upgrade:read_version()),
- schema_ok_or_exit();
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_schema_ok();
{[], false, _} ->
%% First RAM node in cluster, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
- version_ok_or_exit(rabbit_upgrade:read_version()),
- version_ok_or_exit(
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_version_ok(
rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
@@ -405,14 +404,13 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
- schema_ok_or_exit()
+ ensure_schema_ok()
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
%% are members of a different cluster
- throw({error, {unable_to_join_cluster,
- ClusterNodes, Reason}})
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
schema_ok_or_move() ->
@@ -430,22 +428,19 @@ schema_ok_or_move() ->
ok = create_schema()
-version_ok_or_exit({ok, DiscVersion}) ->
+ensure_version_ok({ok, DiscVersion}) ->
case rabbit_upgrade:desired_version() of
- DiscVersion ->
- ok;
- DesiredVersion ->
- exit({schema_mismatch, DesiredVersion, DiscVersion})
+ DiscVersion -> ok;
+ DesiredVersion -> throw({error, {schema_mismatch,
+ DesiredVersion, DiscVersion}})
-version_ok_or_exit({error, _}) ->
+ensure_version_ok({error, _}) ->
ok = rabbit_upgrade:write_version().
-schema_ok_or_exit() ->
+ensure_schema_ok() ->
case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- exit({schema_invalid, Reason})
+ ok -> ok;
+ {error, Reason} -> throw({error, {schema_invalid, Reason}})
create_schema() ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 0440dbe4..0030216e 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -65,8 +65,7 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(" ", FullCommand))]),
+ [string:join(FullCommand, " ")]),
timeout ->
print_error("timeout starting some nodes.", []),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bde9b3d3..248c1fbc 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -36,6 +36,8 @@
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-define(CLEAN_FILENAME, "").
@@ -180,6 +182,8 @@
+-rabbit_upgrade({add_queue_ttl, []}).
-type(hdl() :: ('undefined' | any())).
@@ -226,6 +230,8 @@
-spec(recover/1 ::
([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -345,35 +351,36 @@ recover(DurableQueues) ->
DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
- Directories = case file:list_dir(QueuesDir) of
- {ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
- filename:join(
- QueuesDir, Entry)) ];
- {error, enoent} -> []
- end,
+ QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
{DurableQueueNames, DurableTerms} =
- fun (QueueDir, {DurableAcc, TermsAcc}) ->
- case sets:is_element(QueueDir, DurableDirectories) of
+ fun (QueueDirName, {DurableAcc, TermsAcc}) ->
+ QueueDirPath = filename:join(QueuesDir, QueueDirName),
+ case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case read_shutdown_terms(
- filename:join(QueuesDir, QueueDir)) of
+ case read_shutdown_terms(QueueDirPath) of
{error, _} -> TermsAcc;
{ok, Terms} -> [Terms | TermsAcc]
- {[dict:fetch(QueueDir, DurableDict) | DurableAcc],
+ {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
false ->
- Dir = filename:join(queues_dir(), QueueDir),
- ok = rabbit_misc:recursive_delete([Dir]),
+ ok = rabbit_misc:recursive_delete([QueueDirPath]),
{DurableAcc, TermsAcc}
- end, {[], []}, Directories),
+ end, {[], []}, QueueDirNames),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+all_queue_directory_names(Dir) ->
+ case file:list_dir(Dir) of
+ {ok, Entries} -> [ Entry || Entry <- Entries,
+ filelib:is_dir(
+ filename:join(Dir, Entry)) ];
+ {error, enoent} -> []
+ end.
%% startup and shutdown
@@ -972,3 +979,87 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
+%% upgrade
+add_queue_ttl() ->
+ foreach_queue_index({fun add_queue_ttl_journal/1,
+ fun add_queue_ttl_segment/1}).
+add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Guid:?GUID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ expiry_to_binary(undefined)], Rest};
+add_queue_ttl_journal(_) ->
+ stop.
+add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ Rest/binary>>) ->
+ RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+ RelSeq:?REL_SEQ_BITS, Rest>>) ->
+ Rest};
+add_queue_ttl_segment(_) ->
+ stop.
+foreach_queue_index(Funs) ->
+ QueuesDir = queues_dir(),
+ QueueDirNames = all_queue_directory_names(QueuesDir),
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ transform_queue(filename:join(QueuesDir, QueueDirName),
+ Gatherer, Funs)
+ end)
+ end || QueueDirName <- QueueDirNames],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+ ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
+ ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
+ [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
+ || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ ok = gatherer:finish(Gatherer).
+transform_file(Path, Fun) ->
+ PathTmp = Path ++ ".upgrade",
+ Size = filelib:file_size(Path),
+ {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
+ [{write_buffer, infinity}]),
+ {ok, PathHdl} =
+ file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path).
+drive_transform_fun(Fun, Hdl, Contents) ->
+ case Fun(Contents) of
+ stop ->
+ ok;
+ {Output, Contents1} ->
+ ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 23b2ca34..2d0e3af3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,10 +45,6 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
@@ -325,13 +321,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
{inet_async, Sock, Ref, {ok, Data}} ->
- {State1, Callback1, Length1} =
- handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}),
- mainloop(Deb, switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{recv_ref = none}));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
@@ -571,7 +564,6 @@ handle_frame(Type, Channel, Payload,
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
- %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{ch_fr_pid, ChFrPid} ->
ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
@@ -635,18 +627,18 @@ analyze_frame(_Type, _Body, _Protocol) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1};
+ ensure_stats_timer(
+ switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize},
+ PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]),
- NewState = handle_frame(Type, Channel, Payload, State),
- {NewState, frame_header, 7};
+ handle_frame(Type, Channel, Payload,
+ switch_callback(State, frame_header, 7));
_ ->
- throw({bad_payload, PayloadAndMarker})
+ throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
%% The two rules pertaining to version negotiation:
@@ -698,11 +690,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7}.
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 4452b075..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -114,14 +114,11 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
%% Format and rdnSequence as a RFC4514 subject string.
format_rdn_sequence({rdnSequence, Seq}) ->
- lists:flatten(
- rabbit_misc:intersperse(
- ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))).
+ string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",").
%% Format an RDN set.
format_complex_rdn(RDNs) ->
- lists:flatten(
- rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])).
+ string:join([format_rdn(RDN) || RDN <- RDNs], "+").
%% Format an RDN. If the type name is unknown, use the dotted decimal
%% representation. See RFC4514, section 2.3.
@@ -148,7 +145,7 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
io_lib:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]);
+ io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
none ->
io_lib:format("~p:~s", [T, FV])
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 0071a08a..27a94f6f 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -32,11 +32,13 @@
+-type(step() :: atom()).
+-type(version() :: [step()]).
-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
--spec(read_version/0 ::
- () -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
--spec(desired_version/0 :: () -> [atom()]).
+-spec(desired_version/0 :: () -> version()).
@@ -48,26 +50,25 @@
maybe_upgrade() ->
case read_version() of
{ok, CurrentHeads} ->
- G = load_graph(),
- case unknown_heads(CurrentHeads, G) of
- [] ->
- case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown ->
- exit({future_upgrades_found, Unknown})
- end,
- true = digraph:delete(G),
- ok;
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown -> throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end);
{error, enoent} ->
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
- {error, E} -> {error, E}
+ {ok, [Heads]} -> {ok, Heads};
+ {error, _} = Err -> Err
write_version() ->
@@ -75,17 +76,26 @@ write_version() ->
desired_version() ->
- G = load_graph(),
- Version = heads(G),
- true = digraph:delete(G),
- Version.
+ with_upgrade_graph(fun (G) -> heads(G) end).
%% -------------------------------------------------------------------
-load_graph() ->
- Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade),
- rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades).
+with_upgrade_graph(Fun) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
vertices(Module, Steps) ->
[{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
@@ -93,10 +103,6 @@ vertices(Module, Steps) ->
edges(_Module, Steps) ->
[{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-graph_build_error({vertex, duplicate, StepName}) ->
- exit({duplicate_upgrade, StepName});
-graph_build_error({edge, E, From, To}) ->
- exit({E, From, To}).
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
@@ -111,8 +117,8 @@ upgrades_to_apply(Heads, G) ->
sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName))
- || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
heads(G) ->
lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
@@ -130,9 +136,9 @@ apply_upgrades(Upgrades) ->
ok = write_version(),
ok = file:delete(LockFile);
{error, eexist} ->
- exit(previous_upgrade_failed);
+ throw({error, previous_upgrade_failed});
{error, _} = Error ->
- exit(Error)
+ throw(Error)
apply_upgrade({M, F}) ->
@@ -141,16 +147,12 @@ apply_upgrade({M, F}) ->
%% -------------------------------------------------------------------
-schema_filename() ->
- filename:join(dir(), ?VERSION_FILENAME).
+dir() -> rabbit_mnesia:dir().
-lock_filename() ->
- filename:join(dir(), ?LOCK_FILENAME).
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
+lock_filename() -> filename:join(dir(), ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
-info(Msg, Args) ->
- error_logger:info_msg(Msg, Args).
-dir() ->
- rabbit_mnesia:dir().
+info(Msg, Args) -> error_logger:info_msg(Msg, Args).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 59b8705d..1c56d51d 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -24,28 +24,55 @@
--rabbit_upgrade({remove_user_scope, []}).
+-rabbit_upgrade({remove_user_scope, []}).
+-rabbit_upgrade({hash_passwords, []}).
+-rabbit_upgrade({add_ip_to_listener, []}).
%% -------------------------------------------------------------------
--spec(remove_user_scope/0 :: () -> 'ok').
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(hash_passwords/0 :: () -> 'ok').
+-spec(add_ip_to_listener/0 :: () -> 'ok').
+%% It's a bad idea to use records or record_info here, even for the
+%% destination form. Because in the future, the destination form of
+%% your current transform may not match the record any more, and it
+%% would be messy to have to go back and fix old transforms at that
+%% point.
remove_user_scope() ->
- {atomic, ok} = mnesia:transform_table(
- rabbit_user_permission,
- fun (Perm = #user_permission{
- permission = {permission,
- _Scope, Conf, Write, Read}}) ->
- Perm#user_permission{
- permission = #permission{configure = Conf,
- write = Write,
- read = Read}}
- end,
- record_info(fields, user_permission)),
+ mnesia(
+ rabbit_user_permission,
+ fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) ->
+ {user_permission, UV, {permission, Conf, Write, Read}}
+ end,
+ [user_vhost, permission]).
+hash_passwords() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Password, IsAdmin}) ->
+ Hash = rabbit_access_control:hash_password(Password),
+ {user, Username, Hash, IsAdmin}
+ end,
+ [username, password_hash, is_admin]).
+add_ip_to_listener() ->
+ mnesia(
+ rabbit_listener,
+ fun ({listener, Node, Protocol, Host, Port}) ->
+ {listener, Node, Protocol, Host, {0,0,0,0}, Port}
+ end,
+ [node, protocol, host, ip_address, port]).
+mnesia(TableName, Fun, FieldList) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),