summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-12-05 16:25:02 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-12-05 16:25:02 +0000
commite7c92539cf29b94e427a0d6ed86db6d9fd7195a7 (patch)
tree61a75c78f162bf68ae24c3410c5e0a9c7916b4d6
parentde34eff32d768795895556dc18c3c2c17558feb8 (diff)
parent138b2609f984c69e714b192a9142e7a0f9dabc9f (diff)
downloadrabbitmq-server-e7c92539cf29b94e427a0d6ed86db6d9fd7195a7.tar.gz
merge default into bug24494
-rw-r--r--INSTALL2
-rw-r--r--INSTALL.in10
-rw-r--r--Makefile9
-rw-r--r--README1
-rw-r--r--README.in10
-rw-r--r--docs/rabbitmqctl.1.xml16
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/macports/Portfile.in20
-rw-r--r--src/mirrored_supervisor_tests.erl10
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_control.erl24
-rw-r--r--src/rabbit_guid.erl19
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_misc.erl30
-rw-r--r--src/rabbit_networking.erl10
-rw-r--r--src/rabbit_plugins.erl25
-rw-r--r--src/rabbit_queue_index.erl30
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/supervisor2.erl72
20 files changed, 217 insertions, 94 deletions
diff --git a/INSTALL b/INSTALL
new file mode 100644
index 00000000..be34498e
--- /dev/null
+++ b/INSTALL
@@ -0,0 +1,2 @@
+Please see http://www.rabbitmq.com/download.html for links to guides
+to installing RabbitMQ.
diff --git a/INSTALL.in b/INSTALL.in
deleted file mode 100644
index d1fa81df..00000000
--- a/INSTALL.in
+++ /dev/null
@@ -1,10 +0,0 @@
-Please see http://www.rabbitmq.com/install.html for install
-instructions.
-
-For your convenience, a text copy of these instructions is available
-below. Please be aware that the instructions here may not be as up to
-date as those at the above URL.
-
-===========================================================================
-
-
diff --git a/Makefile b/Makefile
index 2238622c..bf891a45 100644
--- a/Makefile
+++ b/Makefile
@@ -245,13 +245,7 @@ stop-cover: all
srcdist: distclean
mkdir -p $(TARGET_SRC_DIR)/codegen
- cp -r ebin src include LICENSE LICENSE-MPL-RabbitMQ $(TARGET_SRC_DIR)
- cp INSTALL.in $(TARGET_SRC_DIR)/INSTALL
- elinks -dump -no-references -no-numbering $(WEB_URL)install.html \
- >> $(TARGET_SRC_DIR)/INSTALL
- cp README.in $(TARGET_SRC_DIR)/README
- elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
- >> $(TARGET_SRC_DIR)/README
+ cp -r ebin src include LICENSE LICENSE-MPL-RabbitMQ INSTALL README $(TARGET_SRC_DIR)
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
@@ -368,4 +362,3 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
endif
.PHONY: run-qc
-
diff --git a/README b/README
new file mode 100644
index 00000000..67e3a66a
--- /dev/null
+++ b/README
@@ -0,0 +1 @@
+Please see http://www.rabbitmq.com/build-server.html for build instructions.
diff --git a/README.in b/README.in
deleted file mode 100644
index 0e70d0e7..00000000
--- a/README.in
+++ /dev/null
@@ -1,10 +0,0 @@
-Please see http://www.rabbitmq.com/build-server.html for build
-instructions.
-
-For your convenience, a text copy of these instructions is available
-below. Please be aware that the instructions here may not be as up to
-date as those at the above URL.
-
-===========================================================================
-
-
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index f21888bd..15755038 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1315,6 +1315,22 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>eval</command> <arg choice="req"><replaceable>expr</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Evaluate an arbitrary Erlang expression.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl eval 'node().'</screen>
+ <para role="example">
+ This command returns the name of the node to which rabbitmqctl has connected.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index fb27e9bd..96d3974f 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -121,6 +121,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Nov 8 2011 steve@rabbitmq.com 2.7.0-1
+- New Upstream Release
+
* Fri Sep 9 2011 tim@rabbitmq.com 2.6.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 8f526544..ceb08ed0 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.7.0-1) natty; urgency=low
+
+ * New Upstream Release
+
+ -- Steve Powell <steve@rabbitmq.com> Tue, 08 Nov 2011 16:47:50 +0000
+
rabbitmq-server (2.6.1-1) natty; urgency=low
* New Upstream Release
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 03f087d9..b6dad357 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -92,21 +92,15 @@ post-destroot {
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-server
-
- reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \
- ${filespath}/rabbitmq-script-wrapper
- reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
- ${filespath}/rabbitmq-script-wrapper
- reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
- ${filespath}/rabbitmq-script-wrapper
-
- xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:g" \
+ ${wrappersbin}/rabbitmq-server
+ reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:g" \
+ ${wrappersbin}/rabbitmq-server
+ reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:g" \
${wrappersbin}/rabbitmq-server
- xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
- ${wrappersbin}/rabbitmqctl
- xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
- ${wrappersbin}/rabbitmq-plugins
+ file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmqctl
+ file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmq-plugins
xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz rabbitmq-plugins.1.gz \
${mandest}/man1/
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 0900f56f..b8d52ae8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -36,7 +36,9 @@ all_tests() ->
passed = test_already_there(),
passed = test_delete_restart(),
passed = test_which_children(),
- passed = test_large_group(),
+%% commented out in order to determine whether this is the only test
+%% that is failing - see bug 24362
+%% passed = test_large_group(),
passed = test_childspecs_at_init(),
passed = test_anonymous_supervisors(),
passed = test_no_migration_on_shutdown(),
@@ -158,7 +160,7 @@ test_no_migration_on_shutdown() ->
try
call(worker, ping),
exit(worker_should_not_have_migrated)
- catch exit:{timeout_waiting_for_server, _} ->
+ catch exit:{timeout_waiting_for_server, _, _} ->
ok
end
end, [evil, good]).
@@ -245,10 +247,10 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 100, 10).
+call(Id, Msg) -> call(Id, Msg, 1000, 100).
call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, {Id, Msg}});
+ exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
call(Id, Msg, MaxDelay, Decr) ->
try
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b3e92b69..96017df8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -244,7 +244,7 @@ determine_queue_nodes(Args) ->
case [list_to_atom(binary_to_list(Node)) ||
{longstr, Node} <- Nodes] of
[Node] -> {Node, undefined};
- [First | Rest] -> {First, Rest}
+ [First | Rest] -> {First, [First | Rest]}
end;
{{_Type, <<"all">>}, _} ->
{node(), all};
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 5e775ff1..9c5470d2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -101,6 +101,9 @@ start() ->
{error, Reason} ->
rabbit_misc:print_error("~p", [Reason]),
rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
rabbit_misc:print_error("~p", [Reason]),
rabbit_misc:quit(2);
@@ -369,7 +372,23 @@ action(report, Node, _Args, _Opts, Inform) ->
[print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
[print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
io:format("End of server status report~n"),
- ok.
+ ok;
+
+action(eval, Node, [Expr], _Opts, _Inform) ->
+ case erl_scan:string(Expr) of
+ {ok, Scanned, _} ->
+ case erl_parse:parse_exprs(Scanned) of
+ {ok, Parsed} ->
+ {value, Value, _} = unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} ->
+ {error_string, format_parse_error(E)}
+ end;
+ {error, E, _} ->
+ {error_string, format_parse_error(E)}
+ end.
%%----------------------------------------------------------------------------
@@ -444,6 +463,9 @@ system(Cmd) ->
escape_quotes(Cmd) ->
lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+format_parse_error({_Line, Mod, Err}) ->
+ lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index cf3fea1a..2d0f5014 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -76,21 +76,28 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% A persisted serial number, in combination with self/0 (which
- %% includes the node name) uniquely identifies a process in space
+ %% A persisted serial number, the node, and a unique reference
+ %% (per node incarnation) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
- 0};
- {S, I} -> {S, I+1}
+ undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
+ {{Serial, node(), make_ref()}, 0};
+ {S, I} -> {S, I+1}
end,
put(guid, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
+%%
+%% employs base64url encoding, which is safer in more contexts than
+%% plain base64.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(guid()).
+ Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(guid())).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7182042d..d68063db 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -526,9 +526,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS, MonitoringPids),
- MTC = dict:from_list(
- [{MsgId, {ChPid, MsgSeqNo}} ||
- {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MSList),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
@@ -725,7 +727,7 @@ process_instruction(
never ->
{MQ2, PendingCh, MS};
eventually ->
- {MQ2, sets:add_element(MsgId, PendingCh),
+ {MQ2, PendingCh,
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c58e8d26..7e80fd28 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -251,28 +251,34 @@ assert_args_equivalence(Orig, New, Name, Keys) ->
ok.
assert_args_equivalence1(Orig, New, Name, Key) ->
- case {table_lookup(Orig, Key), table_lookup(New, Key)} of
+ {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)},
+ FailureFun = fun () ->
+ protocol_error(precondition_failed, "inequivalent arg '~s'"
+ "for ~s: received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
+ end,
+ case {Orig1, New1} of
{Same, Same} ->
ok;
- {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} ->
+ {{OrigType, OrigVal}, {NewType, NewVal}} ->
case type_class(OrigType) == type_class(NewType) andalso
OrigVal == NewVal of
true -> ok;
- false -> protocol_error(precondition_failed, "inequivalent arg"
- " '~s' for ~s: received ~s but current"
- " is ~s",
- [Key, rs(Name), val(New1), val(Orig1)])
- end
+ false -> FailureFun()
+ end;
+ {_, _} ->
+ FailureFun()
end.
val(undefined) ->
"none";
val({Type, Value}) ->
- Fmt = case is_binary(Value) of
- true -> "the value '~s' of type '~s'";
- false -> "the value '~w' of type '~s'"
- end,
- lists:flatten(io_lib:format(Fmt, [Value, Type])).
+ ValFmt = case is_binary(Value) of
+ true -> "~s";
+ false -> "~w"
+ end,
+ lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'",
+ [Value, Type])).
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive due to general mnesia overheads (figuring out table types
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2c0912df..045ab89a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -307,9 +307,15 @@ connections() ->
rabbit_networking, connections_local, []).
connections_local() ->
- [rabbit_connection_sup:reader(ConnSup) ||
+ [Reader ||
{_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup)].
+ <- supervisor:which_children(rabbit_tcp_client_sup),
+ Reader <- [try
+ rabbit_connection_sup:reader(ConnSup)
+ catch exit:{noproc, _} ->
+ noproc
+ end],
+ Reader =/= noproc].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 9a653a1f..c5fa0801 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -103,11 +103,11 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
write_enabled_plugins(PluginsFile, NewEnabled),
+ NewImplicitlyEnabled = calculate_required_plugins(NewEnabled, AllPlugins),
+ maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
- _ -> NewImplicitlyEnabled =
- calculate_required_plugins(NewEnabled, AllPlugins),
- print_list("The following plugins have been enabled:",
+ _ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
io:format("Plugin configuration has changed. "
"Restart RabbitMQ for changes to take effect.~n")
@@ -353,3 +353,22 @@ calculate_dependencies(Reverse, Sources, AllPlugins) ->
end,
true = digraph:delete(G),
Dests.
+
+maybe_warn_mochiweb(Enabled) ->
+ V = erlang:system_info(otp_release),
+ case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
+ true ->
+ Stars = string:copies("*", 80),
+ io:format("~n~n~s~n"
+ " Warning: Mochiweb enabled and Erlang version ~s "
+ "detected.~n"
+ " Enabling plugins that depend on Mochiweb is not "
+ "supported on this Erlang~n"
+ " version. At least R13B01 is required.~n~n"
+ " RabbitMQ will not start successfully in this "
+ "configuration. You *must*~n"
+ " disable the Mochiweb plugin, or upgrade Erlang.~n"
+ "~s~n~n~n", [Stars, V, Stars]);
+ false ->
+ ok
+ end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b545466..f03c1d1c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- fun () -> queue_index_walker_reader(QueueName, Gatherer)
+ fun () -> link(Gatherer),
+ ok = queue_index_walker_reader(QueueName, Gatherer),
+ unlink(Gatherer),
+ ok
end)
end || QueueName <- DurableQueues],
queue_index_walker({next, Gatherer});
@@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
- false -> {array_new(), 0};
+ false -> Empty;
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {ok, SegData} = file_handle_cache:read(
- Hdl, ?SEGMENT_TOTAL_SIZE),
- Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
+ Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
+ {ok, SegData} -> load_segment_entries(
+ KeepAcked, SegData, Empty);
+ eof -> Empty
+ end,
ok = file_handle_cache:close(Hdl),
Res
end.
@@ -853,15 +859,15 @@ load_segment_entries(KeepAcked,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+ load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
load_segment_entries(KeepAcked,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -871,10 +877,10 @@ load_segment_entries(KeepAcked,
{_Pub, del, no_ack} ->
{-1, array:reset(RelSeq, SegEntries)}
end,
- load_segment_entries(KeepAcked, SegData, SegEntries1,
- UnackedCount + UnackedCountDelta);
-load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
- {SegEntries, UnackedCount}.
+ load_segment_entries(KeepAcked, SegData,
+ {SegEntries1, UnackedCount + UnackedCountDelta});
+load_segment_entries(_KeepAcked, _SegData, Res) ->
+ Res.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b359f7d4..694abd9e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -281,7 +281,7 @@ handle_other({conserve_memory, Conserve}, Deb, State) ->
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, State);
+ mainloop(Deb, maybe_close(State));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 405949ef..f75da872 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -649,15 +649,69 @@ terminate_children([], _SupName, Res) ->
Res.
terminate_simple_children(Child, Dynamics, SupName) ->
- dict:fold(fun (Pid, _Args, _Any) ->
- do_terminate(Child#child{pid = Pid}, SupName)
- end, ok, Dynamics),
+ Pids = dict:fold(fun (Pid, _Args, Pids) ->
+ erlang:monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, child_exit_reason(Child)),
+ [Pid | Pids]
+ end, [], Dynamics),
+ TimeoutMsg = {timeout, make_ref()},
+ TRef = timeout_start(Child, TimeoutMsg),
+ {Replies, Timedout} =
+ lists:foldl(
+ fun (_Pid, {Replies, Timedout}) ->
+ {Reply, Timedout1} =
+ receive
+ TimeoutMsg ->
+ Remaining = Pids -- [P || {P, _} <- Replies],
+ [exit(P, kill) || P <- Remaining],
+ receive {'DOWN', _MRef, process, Pid, Reason} ->
+ {{error, Reason}, true}
+ end;
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {child_res(Child, Reason, Timedout), Timedout};
+ {'EXIT', Pid, Reason} ->
+ receive {'DOWN', _MRef, process, Pid, _} ->
+ {{error, Reason}, Timedout}
+ end
+ end,
+ {[{Pid, Reply} | Replies], Timedout1}
+ end, {[], false}, Pids),
+ timeout_stop(Child, TRef, TimeoutMsg, Timedout),
+ ReportError = shutdown_error_reporter(SupName),
+ [case Reply of
+ {_Pid, ok} -> ok;
+ {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid})
+ end || Reply <- Replies],
+ ok.
+
+child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
+child_exit_reason(#child{}) -> shutdown.
+
+child_res(#child{shutdown=brutal_kill}, killed, false) -> ok;
+child_res(#child{}, shutdown, false) -> ok;
+child_res(#child{restart_type=permanent}, normal, false) -> {error, normal};
+child_res(#child{restart_type={permanent,_}},normal, false) -> {error, normal};
+child_res(#child{}, normal, false) -> ok;
+child_res(#child{}, R, _) -> {error, R}.
+
+timeout_start(#child{shutdown = Time}, Msg) when is_integer(Time) ->
+ erlang:send_after(Time, self(), Msg);
+timeout_start(#child{}, _Msg) ->
+ ok.
+
+timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
+ erlang:cancel_timer(TRef),
+ receive
+ Msg -> ok
+ after
+ 0 -> ok
+ end;
+timeout_stop(#child{}, ok, _Msg, _Timedout) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
- ReportError = fun (Reason) ->
- report_error(shutdown_error, Reason, Child, SupName)
- end,
+ ReportError = shutdown_error_reporter(SupName),
case shutdown(Child#child.pid, Child#child.shutdown) of
ok ->
ok;
@@ -668,7 +722,7 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
_ -> ok
end;
{error, OtherReason} ->
- ReportError(OtherReason)
+ ReportError(OtherReason, Child)
end,
Child#child{pid = undefined};
do_terminate(Child, _SupName) ->
@@ -998,6 +1052,10 @@ report_error(Error, Reason, Child, SupName) ->
{offender, extract_child(Child)}],
error_logger:error_report(supervisor_report, ErrorMsg).
+shutdown_error_reporter(SupName) ->
+ fun(Reason, Child) ->
+ report_error(shutdown_error, Reason, Child, SupName)
+ end.
extract_child(Child) ->
[{pid, Child#child.pid},