summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-23 11:50:01 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-23 11:50:01 +0100
commit12139d526696b1a9a5cd3bafcd04aaa71b7d3818 (patch)
treecfe1a1984401f83150f2855790bfe1aca220a72c
parent1fc6954cb5602f1cbef604e7ba4bfb410f80daba (diff)
parent27d34196460179deba4798c04f28579bc7e8f4c1 (diff)
downloadrabbitmq-server-12139d526696b1a9a5cd3bafcd04aaa71b7d3818.tar.gz
Merge bug25168
-rw-r--r--docs/rabbitmqctl.1.xml16
-rw-r--r--src/gm.erl113
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl3
-rw-r--r--src/gm_tests.erl10
-rw-r--r--src/rabbit.erl44
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_guid.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_tests.erl15
12 files changed, 145 insertions, 101 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 73347cea..23c392b7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -313,8 +313,8 @@
linkend="stop_app"><command>stop_app</command></link>.
</para>
<para>
- Cluster nodes can be of two types: disk or RAM. Disk nodes
- replicate data in RAM and on disk, thus providing redundancy in
+ Cluster nodes can be of two types: disc or RAM. Disc nodes
+ replicate data in RAM and on disc, thus providing redundancy in
the event of node failure and recovery from global events such
as power failure across all nodes. RAM nodes replicate data in
RAM only (with the exception of queue contents, which can reside
@@ -322,10 +322,10 @@
and are mainly used for scalability. RAM nodes are more
performant only when managing resources (e.g. adding/removing
queues, exchanges, or bindings). A cluster must always have at
- least one disk node, and usually should have more than one.
+ least one disc node, and usually should have more than one.
</para>
<para>
- The node will be a disk node by default. If you wish to
+ The node will be a disc node by default. If you wish to
create a RAM node, provide the <command>--ram</command> flag.
</para>
<para>
@@ -367,18 +367,18 @@
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis>
+ <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disc | ram</arg></cmdsynopsis>
</term>
<listitem>
<para>
Changes the type of the cluster node. The node must be stopped for
this operation to succeed, and when turning a node into a RAM node
- the node must not be the only disk node in the cluster.
+ the node must not be the only disc node in the cluster.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl change_cluster_node_type disk</screen>
+ <screen role="example">rabbitmqctl change_cluster_node_type disc</screen>
<para role="example">
- This command will turn a RAM node into a disk node.
+ This command will turn a RAM node into a disc node.
</para>
</listitem>
</varlistentry>
diff --git a/src/gm.erl b/src/gm.erl
index 90433e84..4a95de0d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,7 +376,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -408,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -428,9 +429,10 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
@@ -507,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, Args, TxnFun) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -529,7 +531,7 @@ forget_group(GroupName) ->
end),
ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, Args, TxnFun]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -545,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1000,20 +1008,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(
+ Left, GroupName, TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1032,29 +1042,29 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () ->
+ GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
+ Group =
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, {Result, Group}} =
- mnesia:sync_transaction(
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
+ {Result, Group} =
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- {atomic, Group} =
- mnesia:sync_transaction(
+ Group =
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 57217541..5fbfc223 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -105,7 +105,9 @@ spawn_member() ->
random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ {ok, Pid} = gm:start_link(
+ ?MODULE, ?MODULE, [],
+ fun rabbit_misc:execute_mnesia_transaction/1),
Start = random:uniform(10000),
send_loop(Pid, Start, Start + random:uniform(10000)),
gm:leave(Pid),
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index dad75bd4..84d4ab2f 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -44,7 +44,8 @@ terminate(Owner, _Reason) ->
%% other
wile_e_coyote(Time, WriteUnit) ->
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive joined -> ok end,
timer:sleep(1000), %% wait for all to join
timer:send_after(Time, stop),
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 0a2d4204..a9c0ba90 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -76,7 +76,9 @@ test_confirmed_broadcast() ->
test_member_death() ->
with_two_members(
fun (Pid, Pid2) ->
- {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid3} = gm:start_link(
+ ?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
timeout_joining_gm_group_3),
passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
@@ -128,10 +130,12 @@ test_broadcast_fun(Fun) ->
with_two_members(Fun) ->
ok = gm:create_tables(),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
- {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 87abb9dc..69f77824 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -300,9 +300,9 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
- ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
+ rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
ok = print_plugin_info(rabbit_plugins:active())
@@ -312,13 +312,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
- ok = rabbit_mnesia:check_cluster_consistency(),
+ rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -330,14 +330,19 @@ boot() ->
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- boot_error({could_not_start, App, Reason}, not_available);
+ throw({could_not_start, App, Reason});
handle_app_error(App, Reason) ->
- boot_error({could_not_start, App, Reason}, not_available).
+ throw({could_not_start, App, Reason}).
start_it(StartFun) ->
try
StartFun()
+ catch
+ throw:{could_not_start, _App, _Reason}=Err ->
+ boot_error(Err, not_available);
+ _:Reason ->
+ boot_error(Reason, erlang:get_stacktrace())
after
%% give the error loggers some time to catch up
timer:sleep(100)
@@ -501,13 +506,16 @@ sort_boot_steps(UnsortedSteps) ->
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
+ {missing_functions, MissingFunctions},
"Boot step functions not exported: ~p~n",
[MissingFunctions])
end;
{error, {vertex, duplicate, StepName}} ->
- basic_boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ basic_boot_error({duplicate_boot_step, StepName},
+ "Duplicate boot step name: ~w~n", [StepName]);
{error, {edge, Reason, From, To}} ->
basic_boot_error(
+ {invalid_boot_step_dependency, From, To},
"Could not add boot step dependency of ~w on ~w:~n~s",
[To, From,
case Reason of
@@ -521,7 +529,7 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
-boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
@@ -532,23 +540,27 @@ boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
end,
- basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
-
+ basic_boot_error(Term,
+ Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
boot_error(Reason, Stacktrace) ->
- Fmt = "Error description:~n ~p~n~n"
+ Fmt = "Error description:~n ~p~n~n" ++
"Log files (may contain more information):~n ~s~n ~s~n~n",
Args = [Reason, log_location(kernel), log_location(sasl)],
+ boot_error(Reason, Fmt, Args, Stacktrace).
+
+boot_error(Reason, Fmt, Args, Stacktrace) ->
case Stacktrace of
- not_available -> basic_boot_error(Fmt, Args);
- _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n",
+ not_available -> basic_boot_error(Reason, Fmt, Args);
+ _ -> basic_boot_error(Reason, Fmt ++
+ "Stack trace:~n ~p~n~n",
Args ++ [Stacktrace])
end.
-basic_boot_error(Format, Args) ->
+basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- error_logger:error_msg(Format, Args),
+ rabbit_misc:local_info_msg(Format, Args),
timer:sleep(1000),
- exit({?MODULE, failure_during_boot}).
+ exit({?MODULE, failure_during_boot, Reason}).
%%---------------------------------------------------------------------------
%% boot step functions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9def64ff..68f95778 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -695,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} =
- case DLXFun of
- undefined ->
- {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ ->
- {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, BQS1} = case DLXFun of
+ undefined -> {Next, undefined, BQS2} =
+ BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ -> {Next, Msgs, BQS2} =
+ BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index ba0cb04f..cedbbdb3 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -144,11 +144,7 @@ gen_secure() ->
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
string(G, Prefix) ->
- Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
- ($\/, Acc) -> [$\_ | Acc];
- ($\=, Acc) -> Acc;
- (Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(G)).
+ Prefix ++ "-" ++ rabbit_misc:base64url(G).
binary(G, Prefix) ->
list_to_binary(string(G, Prefix)).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 16690693..e1a21cf7 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -326,7 +326,9 @@ ensure_monitoring(CPid, Pids) ->
init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
GM1 = case GM of
undefined ->
- {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM2} = gm:start_link(
+ QueueName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM2, _Members} ->
ok
end,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e763bd9b..1ba1420f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -99,7 +99,8 @@ init(Q = #amqqueue { name = QName }) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a0536a50..ab9a9ceb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -63,6 +63,7 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([base64url/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -227,6 +228,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(base64url/1 :: (binary()) -> string()).
-endif.
@@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) ->
term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+
+base64url(In) ->
+ lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(In))).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index aa48f228..e6054853 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -774,7 +774,9 @@ test_log_management_during_startup() ->
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_tty_no_handlers_test});
- {error, {cannot_log_to_tty, _, _}} -> ok
+ {badrpc, {'EXIT', {rabbit,failure_during_boot,
+ {error,{cannot_log_to_tty,
+ _, not_installed}}}}} -> ok
end,
%% fix sasl logging
@@ -798,7 +800,9 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ {badrpc, {'EXIT',
+ {rabbit, failure_during_boot,
+ {error, {cannot_log_to_file, _, _}}}}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -809,8 +813,11 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotatation_parent_dirs_test});
- {error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {badrpc,
+ {'EXIT', {rabbit,failure_during_boot,
+ {error, {cannot_log_to_file, _,
+ {error,
+ {cannot_create_parent_dirs, _, eacces}}}}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),