summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-12-02 15:43:10 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-12-02 15:43:10 +0000
commit173a6f2b0d85332ab71c6a113f69b99f559265fe (patch)
tree2d7f979a637c7fdfb6534178a13e06f542edbfb4
parent4aec8bbe162382ada2befc813006ec01e6fa78aa (diff)
parent79a2e2da9bfc918f48b7e6ed92f1f573c4afae8a (diff)
downloadrabbitmq-server-173a6f2b0d85332ab71c6a113f69b99f559265fe.tar.gz
Merge bug 24561 (x-ha-policy=nodes doesn't fully cope with failure of master)
-rw-r--r--docs/rabbitmqctl.1.xml16
-rw-r--r--src/mirrored_supervisor_tests.erl10
-rw-r--r--src/rabbit_control.erl24
-rw-r--r--src/rabbit_guid.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl19
-rw-r--r--src/rabbit_networking.erl10
-rw-r--r--src/rabbit_queue_index.erl30
8 files changed, 92 insertions, 28 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index f21888bd..15755038 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1315,6 +1315,22 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Evaluate an arbitrary Erlang expression.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl eval 'node().'</screen>
+ <para role="example">
+ This command returns the name of the node to which rabbitmqctl has connected.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 0900f56f..b8d52ae8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -36,7 +36,9 @@ all_tests() ->
passed = test_already_there(),
passed = test_delete_restart(),
passed = test_which_children(),
- passed = test_large_group(),
+%% commented out in order to determine whether this is the only test
+%% that is failing - see bug 24362
+%% passed = test_large_group(),
passed = test_childspecs_at_init(),
passed = test_anonymous_supervisors(),
passed = test_no_migration_on_shutdown(),
@@ -158,7 +160,7 @@ test_no_migration_on_shutdown() ->
try
call(worker, ping),
exit(worker_should_not_have_migrated)
- catch exit:{timeout_waiting_for_server, _} ->
+ catch exit:{timeout_waiting_for_server, _, _} ->
ok
end
end, [evil, good]).
@@ -245,10 +247,10 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 100, 10).
+call(Id, Msg) -> call(Id, Msg, 1000, 100).
call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, {Id, Msg}});
+ exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
call(Id, Msg, MaxDelay, Decr) ->
try
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index fa8dd262..20486af5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -98,6 +98,9 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
@@ -368,7 +371,23 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
io:format("End of server status report~n"),
- ok.
+ ok;
+
+action(eval, Node, [Expr], _Opts, _Inform) ->
+ case erl_scan:string(Expr) of
+ {ok, Scanned, _} ->
+ case erl_parse:parse_exprs(Scanned) of
+ {ok, Parsed} ->
+ {value, Value, _} = unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} ->
+ {error_string, format_parse_error(E)}
+ end;
+ {error, E, _} ->
+ {error_string, format_parse_error(E)}
+ end.
%%----------------------------------------------------------------------------
@@ -443,6 +462,9 @@ system(Cmd) ->
escape_quotes(Cmd) ->
lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+format_parse_error({_Line, Mod, Err}) ->
+ lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 523af749..2d0f5014 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -89,8 +89,15 @@ guid() ->
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
+%%
+%% employs base64url encoding, which is safer in more contexts than
+%% plain base64.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(guid()).
+ Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(guid())).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7182042d..73eaed14 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -725,7 +725,7 @@ process_instruction(
never ->
{MQ2, PendingCh, MS};
eventually ->
- {MQ2, sets:add_element(MsgId, PendingCh),
+ {MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 88192e8f..0578cf7d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -250,18 +250,23 @@ assert_args_equivalence(Orig, New, Name, Keys) ->
ok.
assert_args_equivalence1(Orig, New, Name, Key) ->
- case {table_lookup(Orig, Key), table_lookup(New, Key)} of
+ {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)},
+ FailureFun = fun () ->
+ protocol_error(precondition_failed, "inequivalent arg '~s'"
+ "for ~s: received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
+ end,
+ case {Orig1, New1} of
{Same, Same} ->
ok;
- {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} ->
+ {{OrigType, OrigVal}, {NewType, NewVal}} ->
case type_class(OrigType) == type_class(NewType) andalso
OrigVal == NewVal of
true -> ok;
- false -> protocol_error(precondition_failed, "inequivalent arg"
- " '~s' for ~s: received ~s but current"
- " is ~s",
- [Key, rs(Name), val(New1), val(Orig1)])
- end
+ false -> FailureFun()
+ end;
+ {_, _} ->
+ FailureFun()
end.
val(undefined) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2c0912df..045ab89a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -307,9 +307,15 @@ connections() ->
rabbit_networking, connections_local, []).
connections_local() ->
- [rabbit_connection_sup:reader(ConnSup) ||
+ [Reader ||
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children(rabbit_tcp_client_sup),
+ Reader <- [try
+ rabbit_connection_sup:reader(ConnSup)
+ catch exit:{noproc, _} ->
+ noproc
+ end],
+ Reader =/= noproc].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b545466..f03c1d1c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- fun () -> queue_index_walker_reader(QueueName, Gatherer)
+ fun () -> link(Gatherer),
+ ok = queue_index_walker_reader(QueueName, Gatherer),
+ unlink(Gatherer),
+ ok
end)
end || QueueName <- DurableQueues],
queue_index_walker({next, Gatherer});
@@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
- false -> {array_new(), 0};
+ false -> Empty;
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {ok, SegData} = file_handle_cache:read(
- Hdl, ?SEGMENT_TOTAL_SIZE),
- Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
+ Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
+ {ok, SegData} -> load_segment_entries(
+ KeepAcked, SegData, Empty);
+ eof -> Empty
+ end,
ok = file_handle_cache:close(Hdl),
Res
end.
@@ -853,15 +859,15 @@ load_segment_entries(KeepAcked,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+ load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
load_segment_entries(KeepAcked,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -871,10 +877,10 @@ load_segment_entries(KeepAcked,
{_Pub, del, no_ack} ->
{-1, array:reset(RelSeq, SegEntries)}
end,
- load_segment_entries(KeepAcked, SegData, SegEntries1,
- UnackedCount + UnackedCountDelta);
-load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
- {SegEntries, UnackedCount}.
+ load_segment_entries(KeepAcked, SegData,
+ {SegEntries1, UnackedCount + UnackedCountDelta});
+load_segment_entries(_KeepAcked, _SegData, Res) ->
+ Res.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).