diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-12-02 15:43:10 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-12-02 15:43:10 +0000 |
commit | 173a6f2b0d85332ab71c6a113f69b99f559265fe (patch) | |
tree | 2d7f979a637c7fdfb6534178a13e06f542edbfb4 | |
parent | 4aec8bbe162382ada2befc813006ec01e6fa78aa (diff) | |
parent | 79a2e2da9bfc918f48b7e6ed92f1f573c4afae8a (diff) | |
download | rabbitmq-server-173a6f2b0d85332ab71c6a113f69b99f559265fe.tar.gz |
Merge bug 24561 (x-ha-policy=nodes doesn't fully cope with failure of master)
-rw-r--r-- | docs/rabbitmqctl.1.xml | 16 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 10 | ||||
-rw-r--r-- | src/rabbit_control.erl | 24 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 19 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 10 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 30 |
8 files changed, 92 insertions, 28 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index f21888bd..15755038 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1315,6 +1315,22 @@ </para> </listitem> </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Evaluate an arbitrary Erlang expression. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl eval 'node().'</screen> + <para role="example"> + This command returns the name of the node to which rabbitmqctl has connected. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 0900f56f..b8d52ae8 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -36,7 +36,9 @@ all_tests() -> passed = test_already_there(), passed = test_delete_restart(), passed = test_which_children(), - passed = test_large_group(), +%% commented out in order to determine whether this is the only test +%% that is failing - see bug 24362 +%% passed = test_large_group(), passed = test_childspecs_at_init(), passed = test_anonymous_supervisors(), passed = test_no_migration_on_shutdown(), @@ -158,7 +160,7 @@ test_no_migration_on_shutdown() -> try call(worker, ping), exit(worker_should_not_have_migrated) - catch exit:{timeout_waiting_for_server, _} -> + catch exit:{timeout_waiting_for_server, _, _} -> ok end end, [evil, good]). @@ -245,10 +247,10 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 100, 10). +call(Id, Msg) -> call(Id, Msg, 1000, 100). call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, {Id, Msg}}); + exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); call(Id, Msg, MaxDelay, Decr) -> try diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index fa8dd262..20486af5 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -98,6 +98,9 @@ start() -> {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); + {error_string, Reason} -> + print_error("~s", [Reason]), + rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), rabbit_misc:quit(2); @@ -368,7 +371,23 @@ action(report, Node, _Args, _Opts, Inform) -> [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], io:format("End of server status report~n"), - ok. + ok; + +action(eval, Node, [Expr], _Opts, _Inform) -> + case erl_scan:string(Expr) of + {ok, Scanned, _} -> + case erl_parse:parse_exprs(Scanned) of + {ok, Parsed} -> + {value, Value, _} = unsafe_rpc( + Node, erl_eval, exprs, [Parsed, []]), + io:format("~p~n", [Value]), + ok; + {error, E} -> + {error_string, format_parse_error(E)} + end; + {error, E, _} -> + {error_string, format_parse_error(E)} + end. %%---------------------------------------------------------------------------- @@ -443,6 +462,9 @@ system(Cmd) -> escape_quotes(Cmd) -> lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +format_parse_error({_Line, Mod, Err}) -> + lists:flatten(Mod:format_error(Err)). + %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 523af749..2d0f5014 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -89,8 +89,15 @@ guid() -> erlang:md5(term_to_binary(G)). %% generate a readable string representation of a GUID. +%% +%% employs base64url encoding, which is safer in more contexts than +%% plain base64. string_guid(Prefix) -> - Prefix ++ "-" ++ base64:encode_to_string(guid()). + Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; + ($\/, Acc) -> [$\_ | Acc]; + ($\=, Acc) -> Acc; + (Chr, Acc) -> [Chr | Acc] + end, [], base64:encode_to_string(guid())). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7182042d..73eaed14 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -725,7 +725,7 @@ process_instruction( never -> {MQ2, PendingCh, MS}; eventually -> - {MQ2, sets:add_element(MsgId, PendingCh), + {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 88192e8f..0578cf7d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -250,18 +250,23 @@ assert_args_equivalence(Orig, New, Name, Keys) -> ok. assert_args_equivalence1(Orig, New, Name, Key) -> - case {table_lookup(Orig, Key), table_lookup(New, Key)} of + {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)}, + FailureFun = fun () -> + protocol_error(precondition_failed, "inequivalent arg '~s'" + "for ~s: received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) + end, + case {Orig1, New1} of {Same, Same} -> ok; - {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} -> + {{OrigType, OrigVal}, {NewType, NewVal}} -> case type_class(OrigType) == type_class(NewType) andalso OrigVal == NewVal of true -> ok; - false -> protocol_error(precondition_failed, "inequivalent arg" - " '~s' for ~s: received ~s but current" - " is ~s", - [Key, rs(Name), val(New1), val(Orig1)]) - end + false -> FailureFun() + end; + {_, _} -> + FailureFun() end. val(undefined) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2c0912df..045ab89a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -307,9 +307,15 @@ connections() -> rabbit_networking, connections_local, []). connections_local() -> - [rabbit_connection_sup:reader(ConnSup) || + [Reader || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children(rabbit_tcp_client_sup), + Reader <- [try + rabbit_connection_sup:reader(ConnSup) + catch exit:{noproc, _} -> + noproc + end], + Reader =/= noproc]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4b545466..f03c1d1c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( - fun () -> queue_index_walker_reader(QueueName, Gatherer) + fun () -> link(Gatherer), + ok = queue_index_walker_reader(QueueName, Gatherer), + unlink(Gatherer), + ok end) end || QueueName <- DurableQueues], queue_index_walker({next, Gatherer}); @@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> + Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of - false -> {array_new(), 0}; + false -> Empty; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - {ok, SegData} = file_handle_cache:read( - Hdl, ?SEGMENT_TOTAL_SIZE), - Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), + Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of + {ok, SegData} -> load_segment_entries( + KeepAcked, SegData, Empty); + eof -> Empty + end, ok = file_handle_cache:close(Hdl), Res end. @@ -853,15 +859,15 @@ load_segment_entries(KeepAcked, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1); + load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); load_segment_entries(KeepAcked, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> @@ -871,10 +877,10 @@ load_segment_entries(KeepAcked, {_Pub, del, no_ack} -> {-1, array:reset(RelSeq, SegEntries)} end, - load_segment_entries(KeepAcked, SegData, SegEntries1, - UnackedCount + UnackedCountDelta); -load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> - {SegEntries, UnackedCount}. + load_segment_entries(KeepAcked, SegData, + {SegEntries1, UnackedCount + UnackedCountDelta}); +load_segment_entries(_KeepAcked, _SegData, Res) -> + Res. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |