diff options
author | Lukas Larsson <lukas@erlang.org> | 2022-04-30 13:55:18 +0200 |
---|---|---|
committer | Lukas Larsson <lukas@erlang.org> | 2022-04-30 13:56:19 +0200 |
commit | 9b349a58ff1b0ca770f43f829630b3690f5478c1 (patch) | |
tree | 38b3533ba9b666158c224de7ea5cd7dec2630c9e /erts | |
parent | 3002f55f409f44122d3a45ac53bfd453e9aa2cb2 (diff) | |
parent | 864cfc571caa17dcbedfd97047a4a94a731d5304 (diff) | |
download | erlang-9b349a58ff1b0ca770f43f829630b3690f5478c1.tar.gz |
Merge branch 'lukas/22/erts/fix-dist-fragment-exit-leak/OTP-18077' into lukas/23/erts/fix-dist-fragment-exit-leak/OTP-18077
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/bif.c | 5 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 34 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 1 | ||||
-rw-r--r-- | erts/emulator/test/distribution_SUITE.erl | 166 | ||||
-rw-r--r-- | erts/etc/unix/etp-commands.in | 215 |
8 files changed, 418 insertions, 20 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index eefb722c56..543f8ba2d1 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -54,7 +54,10 @@ static Export* flush_monitor_messages_trap = NULL; static Export* set_cpu_topology_trap = NULL; static Export* await_port_send_result_trap = NULL; Export* erts_format_cpu_topology_trap = NULL; -static Export dsend_continue_trap_export; +#ifndef DEBUG +static +#endif +Export dsend_continue_trap_export; Export *erts_convert_time_unit_trap = NULL; static Export *await_msacc_mod_trap = NULL; diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c79c0834f6..dd09e1d73f 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3332,6 +3332,7 @@ erts_dsig_send(ErtsDSigSendContext *ctx) erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); + erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; erts_mtx_lock(&dep->qlock); @@ -3402,12 +3403,15 @@ erts_dsig_send(ErtsDSigSendContext *ctx) } /* More fragments left to be sent, yield and re-schedule */ if (ctx->fragments) { + ctx->c_p->flags |= F_FRAGMENTED_SEND; retval = ERTS_DSIG_SEND_CONTINUE; if (!resume && erts_system_monitor_flags.busy_dist_port) monitor_generic(ctx->c_p, am_busy_dist_port, cid); goto done; } } + + if (ctx->c_p) ctx->c_p->flags &= ~F_FRAGMENTED_SEND; ctx->obuf = NULL; if (suspended) { @@ -3796,9 +3800,8 @@ erts_dist_command(Port *prt, int initial_reds) obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && de_busy && qsize < erts_dist_buf_busy_limit) { - ErtsProcList *suspendees; int resumed; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index cda8855150..2840dff62f 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -2125,7 +2125,8 @@ alcu_size(ErtsAlcType_t alloc_no, ErtsAlcUFixInfo_t *fi, int fisz) erts_alcu_foreign_size(allctr, alloc_no, &size); } - ASSERT(((SWord)size.blocks) >= 0); + /* Sanity check of block size on 64-bit */ + ASSERT(sizeof(UWord) == 4 || ((SWord)size.blocks) >= 0); res += size.blocks; } diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index ad5610d817..b2e34814e2 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -4030,9 +4030,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ErtsMonitorSuspend *msp; erts_aint_t mstate; msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon); - mstate = erts_atomic_read_acqb(&msp->state); - if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) + mstate = erts_atomic_read_band_acqb( + &msp->state, ~ERTS_MSUSPEND_STATE_FLG_ACTIVE); + if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) { erts_resume(c_p, ERTS_PROC_LOCK_MAIN); + } break; } default: @@ -4732,6 +4734,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig) case ERTS_MON_TYPE_PROC: case ERTS_MON_TYPE_DIST_PROC: case ERTS_MON_TYPE_NODE: + case ERTS_MON_TYPE_SUSPEND: size = erts_monitor_size((ErtsMonitor *) sig); break; default: diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 65a6e1c6ad..063ce87d66 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -13468,6 +13468,7 @@ enum continue_exit_phase { ERTS_CONTINUE_EXIT_MONITORS, ERTS_CONTINUE_EXIT_LT_MONITORS, ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG, + ERTS_CONTINUE_EXIT_DIST_SEND, ERTS_CONTINUE_EXIT_DIST_LINKS, ERTS_CONTINUE_EXIT_DIST_MONITORS, ERTS_CONTINUE_EXIT_DIST_PEND_SPAWN_MONITORS, @@ -13486,6 +13487,10 @@ struct continue_exit_state { Uint32 block_rla_ref; }; +#ifdef DEBUG +extern Export dsend_continue_trap_export; +#endif + void erts_continue_exit_process(Process *p) { @@ -13725,6 +13730,20 @@ restart: trap_state->pectxt.dist_state = NIL; trap_state->pectxt.yield = 0; + p->rcount = 0; + + if (p->flags & F_FRAGMENTED_SEND) { + /* The process was re-scheduled while doing a fragmented + distributed send (possibly because it was suspended). + We need to finish doing that send as otherwise incomplete + fragmented messages will be sent to other nodes potentially + causing memory leaks. + */ + ASSERT(p->current == &dsend_continue_trap_export.info.mfa); + /* arg_reg[0] is the argument used in dsend_continue_trap_export */ + trap_state->pectxt.dist_state = p->arg_reg[0]; + } + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); @@ -13786,11 +13805,12 @@ restart: reds -= r; - trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS; + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_SEND; } - case ERTS_CONTINUE_EXIT_DIST_LINKS: { + case ERTS_CONTINUE_EXIT_DIST_SEND: { - continue_dist_send: + continue_dist_send: + ASSERT(p->rcount == 0); if (is_not_nil(trap_state->pectxt.dist_state)) { Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state); ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin); @@ -13823,6 +13843,13 @@ restart: goto restart; } + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS; + } + case ERTS_CONTINUE_EXIT_DIST_LINKS: { + + if (is_not_nil(trap_state->pectxt.dist_state)) + goto continue_dist_send; + reds = erts_link_tree_foreach_delete_yielding( &trap_state->pectxt.dist_links, erts_proc_exit_handle_dist_link, @@ -13831,6 +13858,7 @@ restart: reds); if (reds <= 0 || trap_state->pectxt.yield) goto yield; + trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS; } case ERTS_CONTINUE_EXIT_DIST_MONITORS: { diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 9e5ea73869..28826dd57b 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1486,6 +1486,7 @@ extern int erts_system_profile_ts_type; #define F_DIRTY_MINOR_GC (1 << 21) /* Dirty minor GC scheduled */ #define F_HIBERNATED (1 << 22) /* Hibernated */ #define F_TRAP_EXIT (1 << 23) /* Trapping exit */ +#define F_FRAGMENTED_SEND (1 << 24) /* Process is doing a distributed fragmented send */ /* Signal queue flags */ #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 128c9b07e0..718968e771 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -64,6 +64,7 @@ bad_dist_ext_size/1, start_epmd_false/1, epmd_module/1, bad_dist_fragments/1, + exit_dist_fragments/1, message_latency_large_message/1, message_latency_large_link_exit/1, message_latency_large_monitor_exit/1, @@ -102,7 +103,7 @@ all() -> {group, trap_bif}, {group, dist_auto_connect}, dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip, contended_atom_cache_entry, contended_unicode_atom_cache_entry, - {group, message_latency}, + {group, message_latency}, exit_dist_fragments, {group, bad_dist}, {group, bad_dist_ext}, start_epmd_false, epmd_module, system_limit, hopefull_data_encoding, hopefull_export_fun_bug, @@ -2538,6 +2539,169 @@ dmsg_bad_atom_cache_ref() -> dmsg_bad_tag() -> %% Will fail early at heap size calculation [$?, 66]. +%% Test that processes exiting while sending a fragmented message works +%% as it should. We test that this works while the process doing the send +%% is suspended/resumed in order to trigger bugs in suspend/resume handling +%% while exiting. +%% We also make sure that the binary memory of the receiving node does not grow +%% without shrinking back as there used to be a memory leak on the receiving side. +exit_dist_fragments(_Config) -> + {ok, Node} = start_node(?FUNCTION_NAME), + try + ct:log("Allocations before:~n~p",[erpc:call(Node,instrument,allocations, [])]), + {BinInfo, BinInfoMref} = + spawn_monitor(Node, + fun() -> + (fun F(Acc) -> + H = try erlang:memory(binary) + catch _:_ -> 0 end, + receive + {get, Pid} -> + After = try erlang:memory(binary) + catch _:_ -> 0 end, + Pid ! lists:reverse([After,H|Acc]) + after 100 -> + F([H|Acc]) + end + end)([]) + end), + {Tracer, Mref} = spawn_monitor(fun gather_exited/0), + erlang:trace(self(), true, [{tracer, Tracer}, set_on_spawn, procs, exiting]), + exit_suspend(Node), + receive + {'DOWN',Mref,_,_,_} -> + BinInfo ! {get, self()}, + receive + {'DOWN',BinInfoMref,_,_,Reason} -> + ct:fail(Reason); + Info -> + Before = hd(Info), + Max = lists:max(Info), + After = lists:last(Info), + ct:log("Binary memory before: ~p~n" + "Binary memory max: ~p~n" + "Binary memory after: ~p", + [Before, Max, After]), + ct:log("Allocations after:~n~p", + [erpc:call(Node,instrument,allocations, [])]), + %% We check that the binary data used after is not too large + if + (After - Before) / (Max - Before) > 0.05 -> + ct:log("Memory ratio was ~p",[(After - Before) / (Max - Before)]), + ct:fail("Potential binary memory leak!"); + true -> ok + end + end + end + after + stop_node(Node) + end. + +%% Make sure that each spawned process also has exited +gather_exited() -> + process_flag(message_queue_data, off_heap), + gather_exited(#{}). +gather_exited(Pids) -> + receive + {trace,Pid,spawned,_,_} -> + gather_exited(Pids#{ Pid => true }); + {trace,Pid,exited_out,_,_} -> + {true, NewPids} = maps:take(Pid, Pids), + gather_exited(NewPids); + _M -> + gather_exited(Pids) + after 1000 -> + if Pids == #{} -> ok; + true -> exit(Pids) + end + end. + +exit_suspend(RemoteNode) -> + exit_suspend(RemoteNode, 100). +exit_suspend(RemoteNode, N) -> + Payload = case erlang:system_info(wordsize) of + 8 -> + [<<0:100000/unit:8>> || _ <- lists:seq(1, 10)]; + 4 -> + [<<0:100000/unit:8>> || _ <- lists:seq(1, 2)] + end, + exit_suspend(RemoteNode, N, Payload). +exit_suspend(RemoteNode, N, Payload) -> + Echo = fun F() -> + receive + {From, Msg} -> + From ! erlang:iolist_size(Msg), + F() + end + end, + Pinger = + fun() -> + false = process_flag(trap_exit, true), + RemotePid = spawn_link(RemoteNode, Echo), + Iterations = case erlang:system_info(debug_compiled) of + false -> + 100; + _ -> + 10 + end, + exit_suspend_loop(RemotePid, 2, Payload, Iterations) + end, + Pids = [spawn_link(Pinger) || _ <- lists:seq(1, N)], + MRefs = [monitor(process, Pid) || Pid <- Pids], + [receive {'DOWN',MRef,_,_,_} -> ok end || MRef <- MRefs], + Pids. + +exit_suspend_loop(RemotePid, _Suspenders, _Payload, 0) -> + exit(RemotePid, die), + receive + {'EXIT', RemotePid, _} -> + ok + end; +exit_suspend_loop(RemotePid, Suspenders, Payload, N) -> + LocalPid = spawn_link( + fun() -> + Parent = self(), + [spawn_link( + fun F() -> + try + begin + erlang:suspend_process(Parent), + erlang:yield(), + erlang:suspend_process(Parent), + erlang:yield(), + erlang:resume_process(Parent), + erlang:yield(), + erlang:suspend_process(Parent), + erlang:yield(), + erlang:resume_process(Parent), + erlang:yield(), + erlang:resume_process(Parent), + erlang:yield() + end of + _ -> + F() + catch _:_ -> + ok + end + end) || _ <- lists:seq(1, Suspenders)], + (fun F() -> + RemotePid ! {self(), Payload}, + receive _IOListSize -> ok end, + F() + end)() + end), + exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N - 1). +exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N) -> + receive + {'EXIT', LocalPid, _} -> + exit_suspend_loop(RemotePid, Suspenders, Payload, N); + {'EXIT', _, Reason} -> + exit(Reason) + after 100 -> + exit(LocalPid, die), + exit_suspend_loop(LocalPid, RemotePid, Suspenders, Payload, N) + end. + start_epmd_false(Config) when is_list(Config) -> %% Start a node with the option -start_epmd false. {ok, OtherNode} = start_node(start_epmd_false, "-start_epmd false"), diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index f440b6a882..6cba73b27e 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -57,7 +57,7 @@ document etp-help % etp-msgq, etpf-msgq, % etp-stacktrace, etp-stacktrace-emu, etp-stackdump, etp-stackdump-emu, % etpf-stackdump, etp-dictdump -% etp-process-info, etp-process-memory-info +% etp-process-info, etp-process-info-x, etp-process-memory-info % etp-port-info, etp-port-state, etp-port-sched-flags % etp-heapdump, etp-offheapdump, etpf-offheapdump, % etp-search-heaps, etp-search-alloc, @@ -68,7 +68,7 @@ document etp-help % % System inspection % etp-system-info, etp-schedulers, etp-process, etp-ports, etp-lc-dump, -% etp-migration-info, etp-processes-memory, +% etp-migration-info, etp-processes, etp-processes-x, etp-processes-memory, % etp-compile-info, etp-config-h-info % % Platform specific (when gdb fails you) @@ -2080,6 +2080,181 @@ define etp-pid2proc printf "(Process*)%p\n", $proc end +define etp-mon-lnk-flags-int + printf " Flags: " + if ($arg0) & (1 << 0) + printf "TARGET " + else + printf "ORIGIN " + end + if ($arg0) & (1 << 1) + printf "IN_TABLE " + end + if ($arg0) & (1 << 2) + printf "IN_SUB_TABLE " + end + if ($arg0) & (1 << 3) + printf "NAME " + end + if ($arg0) & (1 << 4) + printf "EXTENDED " + end + if ($arg0) & (1 << 5) + printf "SPAWN_PENDING " + end + if ($arg0) & (1 << 6) + printf "SPAWN_MONITOR " + end + if ($arg0) & (1 << 7) + printf "SPAWN_LINK " + end + if ($arg0) & (1 << 8) + printf "SPAWN_ABANDON " + end + if ($arg0) & (1 << 9) + printf "SPAWN_NO_SMSG " + end + if ($arg0) & (1 << 10) + printf "SPAWN_NO_EMSG " + end + if ($arg0) & (1 << 15) + printf "DBG_VISITED " + end + printf "\n" +end + +define etp-monitor-info-int + set $etp_mon = (ErtsMonitor *)($arg0) + set $etp_mdp = ((ErtsMonitorData *)(((char*)$etp_mon) - (((ErtsMonLnkNode*)$etp_mon)->offset))) + printf " --- Monitor\n" + printf " Type: " + if $etp_mon->type == 0 + printf "Proc" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 1 + printf "Port" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 2 + printf "Time_offset" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 3 + printf "Dist_proc" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 4 + printf "Resource" + printf "\n Other: " + if $etp_mon_origin + etp-1 $etp_mon->other.item + else + printf "(void*)%p", $etp_mon->other.item + end + end + if $etp_mon->type == 5 + printf "Node" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 6 + printf "Nodes" + printf "\n Other: " + etp-1 $etp_mon->other.item + end + if $etp_mon->type == 7 + printf "Suspend\n" + printf " State: " + printf "%d", ((ErtsMonitorSuspend*)$etp_mdp)->state.counter & ~0x8000000000000000 + if ((ErtsMonitorSuspend*)$etp_mdp)->state.counter & 0x8000000000000000 + printf " ACTIVE" + else + printf " NOT ACTIVE" + end + printf "\n Other: " + etp-1 $etp_mon->other.item + end + printf "\n" + etp-mon-lnk-flags-int $etp_mon->flags + printf " Pointer: (ErtsMonitorData*)%p\n", $etp_mdp +end + +define etp-links-info + if ($arg0) != 0 + etp-links-info ($arg0)->node.tree.left + printf " --- Link\n" + printf " Type: " + if ($arg0)->type == (8 + 1) + printf "Proc" + printf "\n Other: " + etp-1 ($arg0)->other.item + end + if ($arg0)->type == (8 + 2) + printf "Port" + printf "\n Other: " + etp-1 ($arg0)->other.item + end + if ($arg0)->type == (8 + 3) + printf "Dist Proc" + printf "\n Other: " + etp-1 ($arg0)->other.item + end + printf "\n" + etp-mon-lnk-flags-int ($arg0)->flags + printf " Pointer: (ErtsLink*)%p\n", ($arg0) + etp-links-info ($arg0)->node.tree.right + end +end + +define etp-monitor-info + if ($arg0) != 0 + etp-monitor-info ($arg0)->node.tree.left + etp-monitor-info-int $arg0 + etp-monitor-info ($arg0)->node.tree.right + end +end + +define etp-links + set $etp_proc = (Process *)($arg0) + if $etp_proc->state.counter & 0x400 + ## Proc is free, monitors may be in trap state + if $etp_proc->i == beam_continue_exit && ($etp_proc)->u.terminate != 0 + set $trap_state = ((struct continue_exit_state*)($etp_proc)->u.terminate) + set $lt_monitor = $trap_state->lt_monitors + set $monitor = $trap_state.monitors + set $links = $trap_state.links + else + ## The monitors are in the trap state on the stack + set $lt_monitor = 0 + set $monitor = 0 + set $link = 0 + end + else + set $monitor = ($etp_proc)->common.u.alive.monitors + set $lt_monitor = ($etp_proc)->common.u.alive.lt_monitors + set $link = ($etp_proc)->common.u.alive.links + end + + etp-links-info $link + etp-monitor-info $monitor + + if $lt_monitor != 0 + set $orig_lt_monitor = $lt_monitor + etp-monitor-info-int $lt_monitor + set $lt_monitor = $lt_monitor->node.list.next + while $lt_monitor != $orig_lt_monitor + etp-monitor-info-int $lt_monitor + set $lt_monitor = $lt_monitor->node.list.next + end + end + +end + define etp-proc-state-int # Args: int # @@ -2232,8 +2407,11 @@ end define etp-proc-flags-int # Args: int # - if ($arg0 & ~((1 << 24)-1)) - printf "GARBAGE<%x> ", ($arg0 & ~((1 << 24)-1)) + if ($arg0 & ~((1 << 25)-1)) + printf "GARBAGE<%x> ", ($arg0 & ~((1 << 25)-1)) + end + if ($arg0 & (1 << 24)) + printf "fragmented-send " end if ($arg0 & (1 << 23)) printf "trap-exit " @@ -2391,6 +2569,7 @@ define etp-process-info-int end if ($arg1) etp-sigqs $etp_proc + etp-links $etp_proc end end @@ -2398,15 +2577,23 @@ define etp-process-info etp-process-info-int ($arg0) 0 end +document etp-process-info +%--------------------------------------------------------------------------- +% etp-process-info Process* +% +% Print info about process +%--------------------------------------------------------------------------- +end + define etp-process-info-x etp-process-info-int ($arg0) !0 end -document etp-process-info +document etp-process-info-x %--------------------------------------------------------------------------- -% etp-process-info Process* +% etp-process-info-x Process* % -% Print info about process +% Print extended info about process %--------------------------------------------------------------------------- end @@ -2505,15 +2692,23 @@ define etp-processes etp-processes-int 0 end +document etp-processes +%--------------------------------------------------------------------------- +% etp-processes +% +% Print misc info about all processes +%--------------------------------------------------------------------------- +end + define etp-processes-x etp-processes-int !0 end -document etp-processes +document etp-processes-x %--------------------------------------------------------------------------- -% etp-processes +% etp-processes-x % -% Print misc info about all processes +% Print extended misc info about all processes %--------------------------------------------------------------------------- end |