diff options
author | Rickard Green <rickard@erlang.org> | 2022-07-13 20:29:52 +0200 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2022-07-13 20:29:52 +0200 |
commit | 9c63a7459cb4489bddd63be9468929cab709ab01 (patch) | |
tree | 9c81595b06601ccab1f6b29bbee2100a3f1f0217 | |
parent | c92c618ce7f44eea7f288bb9e783903ad7b2a577 (diff) | |
parent | f002c5e89e0ba0329fc047d13591d79b4dada346 (diff) | |
download | erlang-9c63a7459cb4489bddd63be9468929cab709ab01.tar.gz |
Merge branch 'maint'
* maint:
[erts] Yield when adjusting large message queues
[erts] Improve flushing of signals
24 files changed, 1776 insertions, 361 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 09cc8fc6b5..b04a5f6052 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -172,6 +172,7 @@ atom cflags atom CHANGE='CHANGE' atom characters_to_binary_int atom characters_to_list_int +atom check_gc atom clear atom clock_service atom close diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index f910a4e8be..0e33dfa667 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -975,12 +975,39 @@ set_default_trace_pattern(Eterm module) } int -erts_check_copy_literals_gc_need(Process *c_p, int *redsp, - char *literals, Uint lit_bsize) +erts_check_copy_literals_gc_need_max_reds(Process *c_p) { + Uint64 words, reds; + /* - * TODO: Implement yielding support! + * Calculate maximum amount of words that needs + * to be scanned... */ + words = 1; /* fvalue */ + words += c_p->hend - c_p->stop; /* stack */ + words += c_p->htop - c_p->heap; /* new heap */ + if (c_p->abandoned_heap) + words += c_p->heap_sz; /* abandoned heap */ + words += c_p->old_htop - c_p->old_heap; /* old heap */ + if (c_p->dictionary) { + Eterm* start = ERTS_PD_START(c_p->dictionary); + Eterm* end = start + ERTS_PD_SIZE(c_p->dictionary); + + words += end - start; /* dictionary */ + } + words += c_p->mbuf_sz; /* heap and message fragments */ + + /* Convert to reductions... */ + reds = ((words - 1)/ERTS_CLA_SCAN_WORDS_PER_RED) + 1; + if (reds > CONTEXT_REDS) + return CONTEXT_REDS+1; + return (int) reds; +} + +int +erts_check_copy_literals_gc_need(Process *c_p, int *redsp, + char *literals, Uint lit_bsize) +{ ErlHeapFragment *hfrag; ErtsMessage *mfp; Uint64 scanned = 0; @@ -1529,22 +1556,30 @@ erts_literal_area_collector_send_copy_request_3(BIF_ALIST_3) req_id = TUPLE3(&tmp_heap[0], BIF_ARG_2, BIF_ARG_3, BIF_ARG_1); - if (BIF_ARG_3 == am_false) { + switch (BIF_ARG_3) { + + case am_init: /* - * Will handle signal queue and check if GC if needed. If - * GC is needed operation will be continued by a GC (below). + * Will handle signal queue and if possible check if GC if needed. + * If GC is needed or needs to be checked the operation will be + * restarted later in the 'check_gc' or 'need_gc' case below... */ erts_proc_sig_send_cla_request(BIF_P, BIF_ARG_1, req_id); - } - else if (BIF_ARG_3 == am_true) { + break; + + case am_check_gc: + case am_need_gc: /* - * Will perform a literal GC. Note that this assumes that - * signal queue already has been handled... + * Will check and/or perform a literal GC. Note that this assumes that + * signal queue already has been handled by 'init' case above... */ - erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id); - } - else + erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id, + BIF_ARG_3 == am_check_gc); + break; + + default: BIF_ERROR(BIF_P, BADARG); + } BIF_RET(am_ok); } diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 385866f279..e9d8d920fc 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1523,7 +1523,7 @@ erts_internal_await_exit_trap(BIF_ALIST_0) * terminated in order to ensure that signal order * is preserved. Yield if necessary. */ - erts_aint32_t state; + erts_aint32_t state = erts_atomic32_read_nob(&BIF_P->state); int reds = ERTS_BIF_REDS_LEFT(BIF_P); (void) erts_proc_sig_handle_incoming(BIF_P, &state, &reds, reds, !0); @@ -5397,45 +5397,52 @@ static BIF_RETTYPE bif_return_trap(BIF_ALIST_2) } static BIF_RETTYPE -bif_handle_signals_return(BIF_ALIST_1) +bif_handle_signals_return(BIF_ALIST_2) { - int local_only = BIF_P->sig_qs.flags & FS_LOCAL_SIGS_ONLY; - int sres, sreds, reds_left; + int reds_left; erts_aint32_t state; - reds_left = ERTS_BIF_REDS_LEFT(BIF_P); - sreds = reds_left; - - if (!local_only) { - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MSGQ); - erts_proc_sig_fetch(BIF_P); - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MSGQ); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) { + flushed: + ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS); + BIF_P->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS); + erts_set_gc_state(BIF_P, !0); /* Allow GC again... */ + BIF_RET(BIF_ARG_2); } + + if (!(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS)) { + int flags = ((is_internal_pid(BIF_ARG_1) + || is_internal_port(BIF_ARG_1)) + ? ERTS_PROC_SIG_FLUSH_FLG_FROM_ID + : ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL); + erts_proc_sig_init_flush_signals(BIF_P, flags, BIF_ARG_1); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + } + + ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS); - state = erts_atomic32_read_nob(&BIF_P->state); - sres = erts_proc_sig_handle_incoming(BIF_P, &state, &sreds, - sreds, !0); - - BUMP_REDS(BIF_P, (int) sreds); - reds_left -= sreds; + reds_left = ERTS_BIF_REDS_LEFT(BIF_P); - if (state & ERTS_PSFLG_EXITING) { - BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - ERTS_BIF_EXITED(BIF_P); - } - if (!sres | (reds_left <= 0)) { - /* - * More signals to handle or out of reds; need - * to yield and continue. Prevent fetching of - * more signals by setting local-sigs-only flag. - */ - BIF_P->sig_qs.flags |= FS_LOCAL_SIGS_ONLY; - ERTS_BIF_YIELD1(&erts_bif_handle_signals_return_export, - BIF_P, BIF_ARG_1); - } + state = erts_atomic32_read_nob(&BIF_P->state); + do { + int sreds = reds_left; + (void) erts_proc_sig_handle_incoming(BIF_P, &state, &sreds, + sreds, !0); + BUMP_REDS(BIF_P, (int) sreds); + if (state & ERTS_PSFLG_EXITING) + ERTS_BIF_EXITED(BIF_P); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + reds_left -= sreds; + } while (reds_left > 0); - BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - BIF_RET(BIF_ARG_1); + /* + * More signals to handle, but out of reductions. Yield + * and come back here and continue... + */ + ERTS_BIF_YIELD2(&erts_bif_handle_signals_return_export, + BIF_P, BIF_ARG_1, BIF_ARG_2); } Export bif_return_trap_export; @@ -5477,7 +5484,7 @@ void erts_init_bif(void) &bif_return_trap); erts_init_trap_export(&erts_bif_handle_signals_return_export, - am_erlang, am_bif_handle_signals_return, 1, + am_erlang, am_bif_handle_signals_return, 2, &bif_handle_signals_return); erts_await_result = erts_export_put(am_erts_internal, diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h index 350f2ad430..a89ea5d4f2 100644 --- a/erts/emulator/beam/bif.h +++ b/erts/emulator/beam/bif.h @@ -522,12 +522,12 @@ do { \ extern Export erts_bif_handle_signals_return_export; -#define ERTS_BIF_HANDLE_SIGNALS_RETURN(P, VAL) \ - BIF_TRAP1(&erts_bif_handle_signals_return_export, (P), (VAL)) +#define ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(P, FROM, VAL) \ + BIF_TRAP2(&erts_bif_handle_signals_return_export, (P), (FROM), (VAL)) -#define ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(Ret, P, Val) \ - ERTS_BIF_PREP_TRAP1((Ret), &erts_bif_handle_signals_return_export, \ - (P), (Val)) +#define ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(Ret, P, From, Val) \ + ERTS_BIF_PREP_TRAP2((Ret), &erts_bif_handle_signals_return_export, \ + (P), (From), (Val)) #define ERTS_BIF_PREP_EXITED(RET, PROC) \ do { \ diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4aed5d9b34..24ba019075 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -274,6 +274,7 @@ type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data type RELEASE_LAREA SHORT_LIVED SYSTEM release_literal_area type SIG_DATA SHORT_LIVED PROCESSES signal_data +type SIG_YIELD_DATA SHORT_LIVED PROCESSES signal_yield_data type DIST_DEMONITOR SHORT_LIVED PROCESSES dist_demonitor type CML_CLEANUP SHORT_LIVED SYSTEM connection_ml_cleanup type ML_YIELD_STATE SHORT_LIVED SYSTEM monitor_link_yield_state diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 2c27d460fd..430c4f1d80 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -1007,6 +1007,7 @@ process_info_aux(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int item_ix, + Sint *msgq_len_p, int flags, Uint *reserve_sizep, Uint *reds); @@ -1025,11 +1026,12 @@ erts_process_info(Process *c_p, Eterm res; Eterm part_res[ERTS_PI_ARGS]; int item_ix_ix, ix; + Sint msgq_len = -1; if (ERTS_PI_FLAG_SINGELTON & flags) { ASSERT(item_ix_len == 1); res = process_info_aux(c_p, hfact, rp, rp_locks, item_ix[0], - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); return res; } @@ -1047,7 +1049,7 @@ erts_process_info(Process *c_p, ix = pi_arg2ix(am_messages); ASSERT(part_res[ix] == THE_NON_VALUE); res = process_info_aux(c_p, hfact, rp, rp_locks, ix, - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); ASSERT(res != am_undefined); ASSERT(res != THE_NON_VALUE); part_res[ix] = res; @@ -1057,7 +1059,7 @@ erts_process_info(Process *c_p, ix = item_ix[item_ix_ix]; if (part_res[ix] == THE_NON_VALUE) { res = process_info_aux(c_p, hfact, rp, rp_locks, ix, - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); ASSERT(res != am_undefined); ASSERT(res != THE_NON_VALUE); part_res[ix] = res; @@ -1092,6 +1094,92 @@ erts_process_info(Process *c_p, static void pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix); +static ERTS_INLINE int +pi_maybe_flush_signals(Process *c_p, int pi_flags) +{ + int reds_left; + erts_aint32_t state; + + /* + * pi_maybe_flush_signals() flush signals in callers + * signal queue for two different reasons: + * + * 1. If we need 'message_queue_len', but not 'messages', we need + * to handle all signals in the middle queue in order for + * 'c_p->sig_qs.len' to reflect the amount of messages in the + * message queue. We could count traverse the queues, but it + * is better to handle all signals in the queue instead since + * this is work we anyway need to do at some point. + * + * 2. Ensures that all signals that the caller might have sent to + * itself are handled before we gather information. + * + * This is, however, not strictly necessary. process_info() is + * not documented to send itself a signal when gathering + * information about itself. That is, the operation is not + * bound by the signal order guarantee when gathering + * information about itself. If we do not handle outstanding + * signals before gathering the information, outstanding signals + * from the caller to itself will not be part of the result. + * This would not be wrong, but perhaps surprising for the user. + * We continue doing it this way for now, since this is how it + * has been done for a very long time. We should, however, + * consider changing this in a future release, since this signal + * handling is not for free, although quite cheap since these + * signals anyway must be handled at some point. + */ + + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) { + flushed: + + ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS + | ERTS_PI_FLAG_NEED_MSGQ_LEN)) + != ERTS_PI_FLAG_NEED_MSGQ_LEN) + || !c_p->sig_qs.cont); + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + + c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS); + erts_set_gc_state(c_p, !0); /* Allow GC again... */ + return 0; /* done, all signals handled... */ + } + + state = erts_atomic32_read_nob(&c_p->state); + + if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { + int flush_flags = 0; + if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS + | ERTS_PI_FLAG_NEED_MSGQ_LEN)) + == ERTS_PI_FLAG_NEED_MSGQ_LEN) + && c_p->sig_qs.cont) { + flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ; + } + if (state & ERTS_PSFLG_MAYBE_SELF_SIGS) + flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID; + if (!flush_flags) + return 0; /* done; no need to flush... */ + erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id); + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + } + + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + reds_left = ERTS_BIF_REDS_LEFT(c_p); + + do { + int sreds = reds_left; + (void) erts_proc_sig_handle_incoming(c_p, &state, &sreds, + sreds, !0); + BUMP_REDS(c_p, (int) sreds); + if (state & ERTS_PSFLG_EXITING) + return -1; /* process exiting... */ + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + reds_left -= sreds; + } while (reds_left > 0); + + return 1; /* yield and continue here later... */ +} + static BIF_RETTYPE process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) { @@ -1110,41 +1198,6 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) ERTS_CT_ASSERT(ERTS_PI_DEF_ARR_SZ > 0); - if (c_p->common.id == pid) { - int local_only = c_p->sig_qs.flags & FS_LOCAL_SIGS_ONLY; - int sres, sreds, reds_left; - - reds_left = ERTS_BIF_REDS_LEFT(c_p); - sreds = reds_left; - - if (!local_only) { - erts_proc_sig_queue_lock(c_p); - erts_proc_sig_fetch(c_p); - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - } - - sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0); - - BUMP_REDS(c_p, (int) sreds); - reds_left -= sreds; - - if (state & ERTS_PSFLG_EXITING) { - c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - goto exited; - } - if (!sres | (reds_left <= 0)) { - /* - * More signals to handle or out of reds; need - * to yield and continue. Prevent fetching of - * more signals by setting local-sigs-only flag. - */ - c_p->sig_qs.flags |= FS_LOCAL_SIGS_ONLY; - goto yield; - } - - c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - } - if (is_atom(opt)) { int ix = pi_arg2ix(opt); item_ix[0] = ix; @@ -1190,7 +1243,16 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) goto badarg; } - if (is_not_internal_pid(pid)) { + if (c_p->common.id == pid) { + int res = pi_maybe_flush_signals(c_p, flags); + if (res != 0) { + if (res > 0) + goto yield; + else + goto exited; + } + } + else if (is_not_internal_pid(pid)) { if (is_external_pid(pid) && external_pid_dist_entry(pid) == erts_this_dist_entry) goto undefined; @@ -1226,6 +1288,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) } if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) { ASSERT(locks & ERTS_PROC_LOCK_MAIN); + if (rp->sig_qs.flags & FS_FLUSHING_SIGS) { + erts_proc_unlock(rp, locks); + goto send_signal; + } erts_proc_sig_queue_lock(rp); erts_proc_sig_fetch(rp); if (rp->sig_qs.cont) { @@ -1264,7 +1330,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) if (c_p == rp || !ERTS_PROC_HAS_INCOMING_SIGNALS(c_p)) ERTS_BIF_PREP_RET(ret, res); else - ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(ret, c_p, res); + ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(ret, c_p, + pid, res); done: @@ -1355,6 +1422,7 @@ process_info_aux(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int item_ix, + Sint *msgq_len_p, int flags, Uint *reserve_sizep, Uint *reds) @@ -1468,8 +1536,10 @@ process_info_aux(Process *c_p, case ERTS_PI_IX_MESSAGES: { ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) + if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) { + *msgq_len_p = 0; res = NIL; + } else { int info_on_self = !(flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER); ErtsMessageInfo *mip; @@ -1487,8 +1557,8 @@ process_info_aux(Process *c_p, heap_need = erts_proc_sig_prep_msgq_for_inspection(c_p, rp, rp_locks, info_on_self, - mip); - len = rp->sig_qs.len; + mip, msgq_len_p); + len = *msgq_len_p; heap_need += len*2; /* Cons cells */ @@ -1517,9 +1587,13 @@ process_info_aux(Process *c_p, } case ERTS_PI_IX_MESSAGE_QUEUE_LEN: { - Sint len = rp->sig_qs.len; + Sint len = *msgq_len_p; + if (len < 0) { + ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) + || !rp->sig_qs.cont); + len = rp->sig_qs.len; + } ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) || !rp->sig_qs.cont); ASSERT(len >= 0); if (len <= MAX_SMALL) res = make_small(len); @@ -3690,42 +3764,54 @@ BIF_RETTYPE erts_internal_is_process_alive_2(BIF_ALIST_2) BIF_ERROR(BIF_P, BADARG); if (!erts_proc_sig_send_is_alive_request(BIF_P, BIF_ARG_1, BIF_ARG_2)) { if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, am_ok); + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, am_ok); } BIF_RET(am_ok); } BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) { + if (is_internal_pid(BIF_ARG_1)) { - erts_aint32_t state; + BIF_RETTYPE result; Process *rp; if (BIF_ARG_1 == BIF_P->common.id) BIF_RET(am_true); rp = erts_proc_lookup_raw(BIF_ARG_1); - if (!rp) - BIF_RET(am_false); + if (!rp) { + result = am_false; + } + else { + erts_aint32_t state = erts_atomic32_read_acqb(&rp->state); + if (state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_SIG_Q + | ERTS_PSFLG_SIG_IN_Q)) { + /* + * If in exiting state, trap out and send 'is alive' + * request and wait for it to complete termination. + * + * If process has signals enqueued, we need to + * send it an 'is alive' request via its signal + * queue in order to ensure that signal order is + * preserved (we may earlier have sent it an + * exit signal that has not been processed yet). + */ + BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1); + } - state = erts_atomic32_read_acqb(&rp->state); - if (state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_SIG_Q - | ERTS_PSFLG_SIG_IN_Q)) { + result = am_true; + } + + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { /* - * If in exiting state, trap out and send 'is alive' - * request and wait for it to complete termination. - * - * If process has signals enqueued, we need to - * send it an 'is alive' request via its signal - * queue in order to ensure that signal order is - * preserved (we may earlier have sent it an - * exit signal that has not been processed yet). + * Ensure that signal order of signals from inspected + * process to us is preserved... */ - BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1); + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, result); } - - BIF_RET(am_true); + BIF_RET(result); } if (is_external_pid(BIF_ARG_1)) { @@ -3734,6 +3820,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) } BIF_ERROR(BIF_P, BADARG); + + } static Eterm @@ -4218,10 +4306,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) ERTS_ASSERT_IS_NOT_EXITING(BIF_P); BIF_RET(am_undefined); } - erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + state = erts_atomic32_read_nob(&BIF_P->state); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, @@ -4284,10 +4372,11 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) ERTS_ASSERT_IS_NOT_EXITING(BIF_P); BIF_RET(am_undefined); } - + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + state = erts_atomic32_read_nob(&BIF_P->state); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index fc415aa4e5..737a6be15f 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -238,8 +238,17 @@ BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3) } else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(res, BIF_P, res); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(res, BIF_P, + from, res); + } } return res; @@ -287,8 +296,16 @@ BIF_RETTYPE erts_internal_port_call_3(BIF_ALIST_3) ERTS_BIF_EXITED(BIF_P); else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } } BIF_RET(retval); @@ -335,8 +352,16 @@ BIF_RETTYPE erts_internal_port_control_3(BIF_ALIST_3) ERTS_BIF_EXITED(BIF_P); else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } } BIF_RET(retval); @@ -382,8 +407,16 @@ BIF_RETTYPE erts_internal_port_close_1(BIF_ALIST_1) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -426,8 +459,16 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -435,7 +476,7 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2) BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) { Eterm retval; - Port* prt; + Port* prt = NULL; if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) { prt = sig_lookup_port(BIF_P, BIF_ARG_1); @@ -474,8 +515,16 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -484,7 +533,7 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2) { Eterm retval; - Port* prt; + Port* prt = NULL; if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) { prt = sig_lookup_port(BIF_P, BIF_ARG_1); @@ -523,8 +572,16 @@ BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 151d0b41ad..0026ecce99 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -539,6 +539,9 @@ erts_queue_proc_message(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* mp, Eterm msg) { + if (sender == receiver) + (void) erts_atomic32_read_bor_nob(&sender->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = sender->common.id; queue_messages(sender->common.id, receiver, receiver_locks, @@ -552,6 +555,9 @@ erts_queue_proc_messages(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, Uint len) { + if (sender == receiver) + (void) erts_atomic32_read_bor_nob(&sender->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); queue_messages(sender->common.id, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, first), last, len); diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 87cd96f4cd..3a7aba5a68 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -284,6 +284,7 @@ typedef struct { typedef struct { ErtsSignal sig; ErtsMessage **prev_next; + signed char is_yield_mark; signed char pass; signed char set_save; signed char in_sigq; diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 840c3cea23..c5caac0bce 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -216,11 +216,26 @@ typedef struct { } ErtsProcSigRPC; typedef struct { + ErtsRecvMarker next; + ErtsRecvMarker last; +} ErtsYieldAdjMsgQ; + +typedef struct { + ErtsYieldAdjMsgQ *yield; Eterm requester; Eterm request_id; } ErtsCLAData; -static void wait_handle_signals(Process *c_p); +typedef struct { + ErtsYieldAdjMsgQ *yield; +} ErtsAdjOffHeapMsgQData; + +typedef struct { + ErtsMessage *first; + ErtsMessage **last; +} ErtsSavedNMSignals; + +static erts_aint32_t wait_handle_signals(Process *c_p); static void wake_handle_signals(Process *proc); static int handle_msg_tracing(Process *c_p, @@ -243,12 +258,17 @@ static int handle_cla(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting); + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs); + static int handle_move_msgq_off_heap(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting); + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs); static void send_cla_reply(Process *c_p, ErtsMessage *sig, Eterm to, Eterm req_id, Eterm result); @@ -293,6 +313,76 @@ proc_sig_hdbg_check_queue(Process *c_p, #define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN) #endif +static void +save_delayed_nm_signal(ErtsSavedNMSignals *saved_sigs, ErtsMessage *sig) +{ + ErtsSignal *nm_sig = (ErtsSignal *) sig; + nm_sig->common.next = NULL; + nm_sig->common.specific.next = NULL; + if (!saved_sigs->first) { + ASSERT(!saved_sigs->last); + saved_sigs->first = sig; + saved_sigs->last = &saved_sigs->first; + } + else { + ErtsSignal *last; + ASSERT(saved_sigs->last); + last = (ErtsSignal *) *saved_sigs->last; + last->common.next = sig; + last->common.specific.next = &last->common.next; + saved_sigs->last = &last->common.next; + } +} + +static erts_aint32_t +restore_delayed_nm_signals(Process *c_p, ErtsSavedNMSignals *saved_sigs) +{ + erts_aint32_t state; + ErtsSignal *lsig; + + ASSERT(saved_sigs->first && saved_sigs->last); + + lsig = (ErtsSignal *) *saved_sigs->last; + if (!c_p->sig_qs.cont) { + ASSERT(!c_p->sig_qs.nmsigs.next); + ASSERT(!c_p->sig_qs.nmsigs.last); + if (saved_sigs->last == &saved_sigs->first) + c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; + else + c_p->sig_qs.nmsigs.last = saved_sigs->last; + c_p->sig_qs.cont_last = &lsig->common.next; + } + else { + lsig->common.next = c_p->sig_qs.cont; + if (c_p->sig_qs.nmsigs.next) { + ASSERT(c_p->sig_qs.nmsigs.last); + if (c_p->sig_qs.nmsigs.next == &c_p->sig_qs.cont) + lsig->common.specific.next = &lsig->common.next; + else + lsig->common.specific.next = c_p->sig_qs.nmsigs.next; + if (c_p->sig_qs.nmsigs.last == &c_p->sig_qs.cont) + c_p->sig_qs.nmsigs.last = &lsig->common.next; + } + else { + ASSERT(!c_p->sig_qs.nmsigs.last); + if (saved_sigs->last == &saved_sigs->first) + c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; + else + c_p->sig_qs.nmsigs.last = saved_sigs->last; + if (c_p->sig_qs.cont_last == &c_p->sig_qs.cont) + c_p->sig_qs.cont_last = &lsig->common.next; + } + } + + c_p->sig_qs.cont = saved_sigs->first; + c_p->sig_qs.nmsigs.next = &c_p->sig_qs.cont; + + state = erts_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_SIG_Q); + state |= ERTS_PSFLG_SIG_Q; + return state; +} + typedef struct { ErtsSignalCommon common; Eterm ref; @@ -389,6 +479,18 @@ get_cla_data(ErtsMessage *sig) + sig->hfrag.used_size); } +static ERTS_INLINE ErtsAdjOffHeapMsgQData * +get_move_msgq_off_heap_data(ErtsMessage *sig) +{ + ASSERT(ERTS_SIG_IS_NON_MSG(sig)); + ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) sig)->common.tag) + == ERTS_SIG_Q_OP_ADJ_MSGQ); + ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) + == ERTS_SIG_Q_TYPE_OFF_HEAP); + return (ErtsAdjOffHeapMsgQData *) (char *) (&sig->hfrag.mem[0] + + sig->hfrag.used_size); +} + static ERTS_INLINE void destroy_trace_info(ErtsSigTraceInfo *ti) { @@ -598,7 +700,7 @@ enqueue_signals(Process *rp, ErtsMessage *first, this = dest_queue->last; - if ( ! is_to_buffer ){ + if (!is_to_buffer) { ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); } @@ -664,7 +766,9 @@ enqueue_signals(Process *rp, ErtsMessage *first, dest_queue->len += num_msgs; - ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + if (!is_to_buffer) { + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + } return state; } @@ -729,7 +833,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, /* Tracing requires sender for local procs and ports. The assertions below * will not catch errors after time-of-death, but ought to find most * problems. */ - ASSERT(sender != NULL || + ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH || (is_normal_sched && esdp->pending_signal.sig == sig) || (!(is_internal_pid(from) && erts_proc_lookup(from) != NULL) && @@ -824,6 +928,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, sigp = &first; first_last_done: + + if ((void *) sender == (void *) rp) + (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); + sig->common.specific.next = NULL; /* may add signals before sig */ @@ -1463,22 +1572,17 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) Eterm tag = ((ErtsSignal *) sig)->common.tag; /* - * Heap alias message, heap frag alias message and - * adjust message queue signals are the only non-message - * signals, which are allocated as messages, which do not - * use a combined message / heap fragment. + * Heap alias message and heap frag alias message are + * the only non-message signals, which are allocated as + * messages, which do not use a combined message / heap + * fragment. */ - if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag) - || tag == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ, - ERTS_SIG_Q_TYPE_OFF_HEAP, - 0)) { + if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag)) { sig->data.heap_frag = NULL; return; } - - - if(ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) { + if (ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) { /* Retrieve pointer to heap fragment (may not be NULL). */ void *attached; (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached); @@ -1489,10 +1593,45 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) /* * Using a combined heap fragment... */ - ErtsDistExternal *edep = get_external_non_msg_signal(sig); - if (edep) - erts_free_dist_ext_copy(edep); - + switch (ERTS_PROC_SIG_OP(tag)) { + + case ERTS_SIG_Q_OP_ADJ_MSGQ: { + /* We need to deallocate yield markers if such has been used... */ + ErtsYieldAdjMsgQ *yp; + switch (ERTS_PROC_SIG_TYPE(tag)) { + case ERTS_SIG_Q_TYPE_CLA: { + ErtsCLAData *cla = get_cla_data(sig); + yp = cla->yield; + cla->yield = NULL; + break; + } + case ERTS_SIG_Q_TYPE_OFF_HEAP: { + ErtsAdjOffHeapMsgQData *ohdp = get_move_msgq_off_heap_data(sig); + yp = ohdp->yield; + ohdp->yield = NULL; + break; + } + default: + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); + yp = NULL; + break; + } + if (yp) { + ASSERT(!yp->next.in_msgq && !yp->next.in_sigq); + ASSERT(!yp->last.in_msgq && !yp->last.in_sigq); + erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp); + } + break; + } + + default: { + ErtsDistExternal *edep = get_external_non_msg_signal(sig); + if (edep) + erts_free_dist_ext_copy(edep); + break; + } + } + sig->data.attached = ERTS_MSG_COMBINED_HFRAG; hfrag = sig->hfrag.next; erts_cleanup_offheap(&sig->hfrag.off_heap); @@ -2712,6 +2851,7 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) cla = (ErtsCLAData *) (char *) hp; hfrag->used_size = hp - start_hp; + cla->yield = NULL; cla->requester = c_p->common.id; cla->request_id = req_id_cpy; @@ -2733,8 +2873,20 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) void erts_proc_sig_send_move_msgq_off_heap(Eterm to) { - ErtsMessage *sig = erts_alloc_message(0, NULL); + ErtsMessage *sig; + Eterm *hp; + Uint hsz; + ErtsAdjOffHeapMsgQData *ohdp; ASSERT(is_internal_pid(to)); + + hsz = sizeof(ErtsAdjOffHeapMsgQData)/sizeof(Uint); + sig = erts_alloc_message(hsz, &hp); + + ohdp = (ErtsAdjOffHeapMsgQData *) (char *) hp; + ohdp->yield = NULL; + + sig->hfrag.used_size = 0; + ERL_MESSAGE_TERM(sig) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ, ERTS_SIG_Q_TYPE_OFF_HEAP, 0); @@ -2751,6 +2903,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to) } } +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) +{ + int force_flush_buffers = 0, enqueue_mq, fetch_sigs; + ErtsSignal *sig; + + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS))); + ASSERT(flags); + ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0); + ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + || is_internal_pid(id) || is_internal_port(id)); + + sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon)); + sig->common.next = NULL; + sig->common.specific.attachment = NULL; + sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + switch (flags) { + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL: + id = c_p->common.id; + force_flush_buffers = !0; + /* Fall through... */ + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID: + if (!proc_queue_signal(NULL, id, c_p->common.id, sig, + force_flush_buffers, ERTS_SIG_Q_OP_FLUSH)) + ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves"); + enqueue_mq = 0; + fetch_sigs = !0; + break; + case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ: + enqueue_mq = !0; + fetch_sigs = 0; + break; + default: + enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ); + fetch_sigs = !0; + break; + } + + erts_set_gc_state(c_p, 0); + + if (fetch_sigs) { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } + + c_p->sig_qs.flags |= FS_FLUSHING_SIGS; + + if (enqueue_mq) { + if (!c_p->sig_qs.cont) { + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + } + else { + if (!c_p->sig_qs.nmsigs.last) { + ASSERT(!c_p->sig_qs.nmsigs.next); + c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last; + } + else { + ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last; + ASSERT(c_p->sig_qs.nmsigs.next); + ASSERT(lsig && !lsig->common.specific.next); + lsig->common.specific.next = c_p->sig_qs.cont_last; + } + + c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last; + *c_p->sig_qs.cont_last = (ErtsMessage *) sig; + c_p->sig_qs.cont_last = &sig->common.next; + } + } + +} + static int handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp) { @@ -2937,7 +3166,7 @@ remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) static ERTS_INLINE void remove_mq_sig(Process *c_p, ErtsMessage *sig, - ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) + ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) { /* * Remove signal from (middle) signal queue. @@ -3165,6 +3394,8 @@ recv_marker_deallocate(Process *c_p, ErtsRecvMarker *markp) ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; int ix, nix; + ASSERT(!markp->is_yield_mark); + ASSERT(blkp); ERTS_HDBG_CHK_RECV_MRKS(c_p); @@ -3215,6 +3446,7 @@ recv_marker_dequeue(Process *c_p, ErtsRecvMarker *markp) { ErtsMessage *sigp; + ASSERT(!markp->is_yield_mark); ASSERT(markp->proc == c_p); if (markp->in_sigq <= 0) { @@ -3280,6 +3512,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp, /* Allocate marker for 'uniqp' in index zero... */ *ixp = 0; blkp->ref[0] = recv_marker_uniq(c_p, uniqp); + blkp->marker[0].is_yield_mark = 0; markp = &blkp->marker[0]; markp->next_ix = markp->prev_ix = 0; blkp->used_ix = 0; @@ -3295,6 +3528,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp, blkp->free_ix = 1; for (ix = 1; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) { blkp->ref[ix] = am_free; + blkp->marker[ix].is_yield_mark = 0; if (ix == ERTS_RECV_MARKER_BLOCK_SIZE - 1) blkp->marker[ix].next_ix = -1; /* End of list */ else @@ -3562,20 +3796,29 @@ erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id) } void -erts_msgq_remove_leading_recv_markers(Process *c_p) +erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p) { + ErtsMessage **save; /* * Receive markers in the front of the queue does not - * add any value, so we just remove them... + * add any value, so we just remove them. We need to + * keep and pass yield markers though... */ ASSERT(c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); + save = &c_p->sig_qs.first; do { - ErtsRecvMarker *markp = (ErtsRecvMarker *) c_p->sig_qs.first; - recv_marker_dequeue(c_p, markp); - } while (c_p->sig_qs.first - && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)); + ErtsRecvMarker *markp = (ErtsRecvMarker *) *save; + if (markp->is_yield_mark) + save = &markp->sig.common.next; + else + recv_marker_dequeue(c_p, markp); + } while (*save && ERTS_SIG_IS_RECV_MARKER(*save)); + + c_p->sig_qs.save = save; + + ASSERT(!*save || ERTS_SIG_IS_MSG(*save)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); } @@ -3587,7 +3830,8 @@ erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp) ASSERT(ERTS_SIG_IS_RECV_MARKER(sigp)); do { ErtsRecvMarker *markp = (ErtsRecvMarker *) sigp; - if (++markp->pass > ERTS_RECV_MARKER_PASS_MAX) { + if (!markp->is_yield_mark + && ++markp->pass > ERTS_RECV_MARKER_PASS_MAX) { recv_marker_dequeue(c_p, markp); sigp = *sigpp; } @@ -5147,22 +5391,22 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, int *redsp, int max_reds, int local_only) { Eterm tag; - erts_aint32_t state; + erts_aint32_t state = *statep; int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; - + ErtsSavedNMSignals delayed_nm_signals = {0}; + ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS)); if (c_p->sig_qs.flags & FS_HANDLING_SIGS) - wait_handle_signals(c_p); + state = wait_handle_signals(c_p); else c_p->sig_qs.flags |= FS_HANDLING_SIGS; ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); - state = erts_atomic32_read_nob(&c_p->state); - if (!local_only) { + if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { if (ERTS_PSFLG_SIG_IN_Q & state) { erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); @@ -5674,21 +5918,57 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; - case ERTS_SIG_Q_OP_ADJ_MSGQ: + case ERTS_SIG_Q_OP_ADJ_MSGQ: { + int adj_limit, adj_cnt, min_adj_limit; + /* + * This may require a substantial amount of work and we + * want to get it over and done with in a reasonable + * amount of time, so we bump up the limit for it a bit... + */ + min_adj_limit = ERTS_SIG_REDS_CNT_FACTOR*CONTEXT_REDS/6; + if (sig->next) + adj_limit = min_adj_limit; + else { + adj_limit = limit - cnt; + if (adj_limit < min_adj_limit) + adj_limit = min_adj_limit; + } ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); switch (ERTS_PROC_SIG_TYPE(tag)) { case ERTS_SIG_Q_TYPE_CLA: - cnt += handle_cla(c_p, sig, next_nm_sig, 0); + adj_cnt = handle_cla(c_p, sig, next_nm_sig, 0, adj_limit, + &delayed_nm_signals); break; case ERTS_SIG_Q_TYPE_OFF_HEAP: - cnt += handle_move_msgq_off_heap(c_p, sig, next_nm_sig, 0); + adj_cnt = handle_move_msgq_off_heap(c_p, sig, next_nm_sig, + 0, adj_limit, + &delayed_nm_signals); break; default: - ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type"); + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); break; } + cnt += adj_cnt; + limit += adj_cnt; + if (limit > abs_lim) + abs_lim = limit; ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + } + + case ERTS_SIG_Q_OP_FLUSH: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + remove_nm_sig(c_p, sig, next_nm_sig); + erts_free(ERTS_ALC_T_SIG_DATA, sig); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + /* + * The caller has been exclusively handling signals until this + * point. Break out and let the process continue with other + * things as well... + */ + goto stop; case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -5719,6 +5999,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, case ERTS_SIG_Q_OP_RECV_MARK: { ErtsRecvMarker *markp = (ErtsRecvMarker *) sig; ASSERT(markp->in_sigq); + ASSERT(!markp->is_yield_mark); if (markp->in_sigq < 0) { /* Marked for removal... */ @@ -5806,7 +6087,7 @@ stop: { *next_nm_sig = &c_p->sig_qs.cont; if (c_p->sig_qs.nmsigs.last == tracing.messages.next) c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); } else { ASSERT(!c_p->sig_qs.nmsigs.next); @@ -5814,7 +6095,6 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; } if (tracing.messages.next != &c_p->sig_qs.cont) { @@ -5860,7 +6140,7 @@ stop: { ASSERT(c_p->sig_qs.cont); - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); res = 0; } @@ -5893,10 +6173,36 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; res = !0; } + if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS) + & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) { + /* + * We know we do not have any outstanding signals + * from ourselves... + */ + state = erts_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_MAYBE_SELF_SIGS); + state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS; + } + + if (delayed_nm_signals.first) { + /* + * We do this after clearing ERTS_PSFLG_MAYBE_SELF_SIGS + * since there currently are no signals that can be delayed + * that should be counted as originating from the process + * itself. If such signals appear in the future this has to + * be accounted for... + * + * The adjust message queue data "signal" does originate from + * the process itself, but it is not conseptually a signal. + */ + state = restore_delayed_nm_signals(c_p, &delayed_nm_signals); + } + + *statep = state; + /* Ensure that 'save' doesn't point to a receive marker... */ if (*c_p->sig_qs.save && ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) { @@ -5906,7 +6212,7 @@ stop: { ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); - *redsp = cnt/4 + 1; + *redsp = cnt/ERTS_SIG_REDS_CNT_FACTOR + 1; if (yield) { int vreds = max_reds - *redsp; @@ -6168,23 +6474,31 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, case ERTS_SIG_Q_OP_ADJ_MSGQ: switch (ERTS_PROC_SIG_TYPE(tag)) { case ERTS_SIG_Q_TYPE_CLA: - handle_cla(c_p, sig, next_nm_sig, !0); + handle_cla(c_p, sig, next_nm_sig, !0, limit, NULL); break; case ERTS_SIG_Q_TYPE_OFF_HEAP: - handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0); + handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0, + limit, NULL); break; default: - ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type"); + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); break; } break; + case ERTS_SIG_Q_OP_FLUSH: + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: destroy_trace_info((ErtsSigTraceInfo *) sig); break; case ERTS_SIG_Q_OP_RECV_MARK: { ErtsRecvMarker *markp = (ErtsRecvMarker *) sig; + ASSERT(!markp->is_yield_mark); markp->in_msgq = markp->in_sigq = markp->set_save = 0; recv_marker_deallocate(c_p, markp); break; @@ -6276,6 +6590,7 @@ clear_seq_trace_token(ErtsMessage *sig) case ERTS_SIG_Q_OP_RPC: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_ADJ_MSGQ: + case ERTS_SIG_Q_OP_FLUSH: break; default: @@ -6288,8 +6603,33 @@ clear_seq_trace_token(ErtsMessage *sig) void erts_proc_sig_clear_seq_trace_tokens(Process *c_p) { - erts_proc_sig_fetch(c_p); - ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig)); + int ix; + ErtsSignalInQueueBufferArray *bap; + int unget_info; + ErtsMessage *qs[] = {c_p->sig_qs.first, + c_p->sig_qs.cont, + c_p->sig_inq.first}; + + ERTS_LC_ASSERT(erts_thr_progress_is_blocking()); + + for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) { + ErtsMessage *sigp; + for (sigp = qs[ix]; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + + bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info); + if (bap) { + for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) { + ErtsSignalInQueueBuffer* bp = &bap->slots[ix]; + if (bp->b.alive) { + ErtsMessage *sigp; + for (sigp = bp->b.queue.first; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + } + erts_proc_sig_queue_unget_buffers(bap, unget_info); + } } Uint @@ -6336,11 +6676,6 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; case ERTS_SIG_Q_OP_ADJ_MSGQ: - if (type == ERTS_SIG_Q_TYPE_OFF_HEAP) { - size = sizeof(ErtsMessageRef); - break; - } - /* Fall through... */ case ERTS_SIG_Q_OP_SYNC_SUSPEND: case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_IS_ALIVE: @@ -6415,6 +6750,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; } + case ERTS_SIG_Q_OP_FLUSH: + size = sizeof(ErtsSignalCommon); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: size = sizeof(ErtsSigTraceInfo); break; @@ -6533,6 +6872,7 @@ erts_proc_sig_receive_helper(Process *c_p, if (max_reds < reds) max_reds = reds; #endif + state = erts_atomic32_read_nob(&c_p->state); (void) erts_proc_sig_handle_incoming(c_p, &state, &reds, max_reds, !0); consumed_reds += reds; @@ -6571,7 +6911,6 @@ erts_proc_sig_receive_helper(Process *c_p, if (left_reds <= 0) break; /* yield */ - ASSERT(!c_p->sig_qs.cont); /* Go fetch again... */ } @@ -6584,6 +6923,127 @@ erts_proc_sig_receive_helper(Process *c_p, return consumed_reds; } +static void +init_yield_marker(Process *c_p, ErtsRecvMarker *mrkp) +{ + mrkp->prev_next = NULL; + mrkp->is_yield_mark = (char) !0; + mrkp->pass = (char) 100; + mrkp->set_save = (char) 0; + mrkp->in_sigq = (char) 0; + mrkp->in_msgq = (char) 0; + mrkp->prev_ix = (char) -100; + mrkp->next_ix = (char) -100; +#ifdef DEBUG + mrkp->used = (char) !0; + mrkp->proc = c_p; +#endif + mrkp->sig.common.next = NULL; + mrkp->sig.common.specific.attachment = NULL; + mrkp->sig.common.tag = ERTS_RECV_MARKER_TAG; +} + +static void +remove_yield_marker(Process *c_p, ErtsRecvMarker *mrkp) +{ + ASSERT(mrkp); + ASSERT(mrkp->is_yield_mark); + ASSERT(mrkp->in_msgq); + remove_iq_sig(c_p, (ErtsMessage *) mrkp, mrkp->prev_next); + mrkp->in_msgq = 0; + mrkp->in_sigq = 0; + mrkp->prev_next = NULL; + mrkp->sig.common.next = NULL; +} + +static ErtsYieldAdjMsgQ * +create_yield_adj_msgq_data(Process *c_p) +{ + ErtsYieldAdjMsgQ *yp = erts_alloc(ERTS_ALC_T_SIG_YIELD_DATA, + sizeof(ErtsYieldAdjMsgQ)); + init_yield_marker(c_p, &yp->next); + init_yield_marker(c_p, &yp->last); + return yp; +} + +static ERTS_INLINE void +insert_adj_msgq_yield_markers(Process *c_p, + ErtsYieldAdjMsgQ *yp, + ErtsMessage **nextpp, + ErtsMessage ***next_nm_sig, + ErtsSavedNMSignals *saved_sigs) +{ + ErtsMessage *sig, *nextp; + + ASSERT(yp); + ASSERT(nextpp); + ASSERT(next_nm_sig && *next_nm_sig && **next_nm_sig); + ASSERT(!yp->next.in_msgq); + + sig = **next_nm_sig; + + ASSERT(ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(sig)) + == ERTS_SIG_Q_OP_ADJ_MSGQ); + + /* + * Insert 'next' yield marker. This is in the inner queue or + * in the beginning of the middle queue where we've already + * begun using 'prev_next' pointers for receive markers, + * so if a receive marker follow, we need to update it. + */ + yp->next.in_msgq = !0; + yp->next.in_sigq = !0; + yp->next.prev_next = nextpp; + yp->next.sig.common.next = nextp = *nextpp; + *nextpp = (ErtsMessage *) &yp->next; + + ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->next); + + if (nextp && ERTS_SIG_IS_RECV_MARKER(nextp)) { + ErtsRecvMarker *next_mrkp = (ErtsRecvMarker *) nextp; + next_mrkp->prev_next = &yp->next.sig.common.next; + } + + if (yp->last.in_msgq) { + remove_nm_sig(c_p, sig, next_nm_sig); + } + else { + /* + * Replace adj-msgq signal with 'last' yield marker. + * + * This is in the middle queue after the point where + * we've begun using 'prev_next' pointers for receive + * markers, so if a receive marker follow, we do not + * need to adjust its 'prev_next'. + */ + ErtsMessage **next_sig = *next_nm_sig; + yp->last.in_msgq = (char) !0; + yp->last.in_sigq = (char) !0; + yp->last.prev_next = next_sig; + *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; + *next_sig = (ErtsMessage *) &yp->last; + remove_mq_sig(c_p, sig, &yp->last.sig.common.next, next_nm_sig); + + ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->last); + } + + save_delayed_nm_signal(saved_sigs, sig); +} + +static ERTS_INLINE void +destroy_adj_msgq_yield_markers(Process *c_p, ErtsYieldAdjMsgQ **ypp) +{ + ErtsYieldAdjMsgQ *yp = *ypp; + if (yp) { + if (yp->next.in_msgq) + remove_yield_marker(c_p, &yp->next); + if (yp->last.in_msgq) + remove_yield_marker(c_p, &yp->last); + erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp); + *ypp = NULL; + } +} + static Uint area_literal_size(Eterm* start, Eterm* end, char* lit_start, Uint lit_size) { @@ -6705,17 +7165,16 @@ static int handle_cla(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting) + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs) { - /* - * TODO: Implement yielding support! - */ ErtsCLAData *cla; - ErtsMessage *msg; + ErtsMessage *msg, *endp; ErtsLiteralArea *la; char *literals; Uint lit_bsize; - int nmsgs, reds; + int nmsgs, reds, stretch_yield_limit = 0; Eterm result = am_ok; Uint64 cnt = 0; @@ -6736,6 +7195,30 @@ handle_cla(Process *c_p, * can therefore occur behind this signal. */ + msg = c_p->sig_qs.first; + if (!msg) + msg = c_p->sig_qs.cont; + + if (!cla->yield) { + endp = sig; + } + else { + if (!cla->yield->next.in_msgq) { + /* All messages already handled... */ + ASSERT(!cla->yield->last.in_msgq); + stretch_yield_limit = !0; + endp = msg = sig; + } + else { + ASSERT(!!cla->yield->last.in_msgq); + msg = cla->yield->next.sig.common.next; + endp = (ErtsMessage *) &cla->yield->last; + remove_yield_marker(c_p, &cla->yield->next); + } + } + + ASSERT(!cla->yield || !cla->yield->next.in_msgq); + la = ERTS_COPY_LITERAL_AREA(); if (!la) { ASSERT(0); @@ -6748,12 +7231,8 @@ handle_cla(Process *c_p, literals = (char *) &la->start[0]; lit_bsize = (char *) la->end - literals; - msg = c_p->sig_qs.first; - if (!msg) - msg = c_p->sig_qs.cont; - nmsgs = 0; - while (msg != sig) { + while (msg != endp) { ASSERT(!!msg); nmsgs++; if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) { @@ -6848,6 +7327,18 @@ handle_cla(Process *c_p, } } + if (cnt > limit) { /* yield... */ + ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next; + ASSERT(*nextpp); + if (*nextpp == endp) + break; /* we're at the end; no point yielding here... */ + if (!cla->yield) + cla->yield = create_yield_adj_msgq_data(c_p); + insert_adj_msgq_yield_markers(c_p, cla->yield, nextpp, + next_nm_sig, saved_nm_sigs); + return cnt; + } + msg = msg->next; if (!msg) msg = c_p->sig_qs.cont; @@ -6855,18 +7346,36 @@ handle_cla(Process *c_p, remove_nm_sig(c_p, sig, next_nm_sig); - reds = 0; - if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize)) - result = am_need_gc; - - cnt += reds * ERTS_SIG_REDS_CNT_FACTOR; + reds = erts_check_copy_literals_gc_need_max_reds(c_p); + cnt++; + if (reds > CONTEXT_REDS) + result = am_check_gc; + else if (stretch_yield_limit + || cnt + reds*ERTS_SIG_REDS_CNT_FACTOR <= limit) { + reds = 0; + if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize)) + result = am_need_gc; + cnt += reds * ERTS_SIG_REDS_CNT_FACTOR; + } + else { + /* yield... */ + if (!cla->yield) + cla->yield = create_yield_adj_msgq_data(c_p); + else if (!!cla->yield->last.in_msgq) + remove_yield_marker(c_p, &cla->yield->last); + ASSERT(!cla->yield->next.in_msgq); + save_delayed_nm_signal(saved_nm_sigs, sig); + return cnt; + } done: + destroy_adj_msgq_yield_markers(c_p, &cla->yield); + send_cla_reply(c_p, sig, cla->requester, cla->request_id, result); - if (cnt > CONTEXT_REDS) - return CONTEXT_REDS; + if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR) + return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR; return cnt; } @@ -6874,12 +7383,12 @@ static int handle_move_msgq_off_heap(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting) + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs) { - /* - * TODO: Implement yielding support! - */ - ErtsMessage *msg; + ErtsAdjOffHeapMsgQData *ohdp; + ErtsMessage *msg, *endp; int nmsgs; Uint64 cnt = 0; @@ -6899,6 +7408,8 @@ handle_move_msgq_off_heap(Process *c_p, cnt++; + ohdp = get_move_msgq_off_heap_data(sig); + if (exiting) { /* signal already removed from queue... */ goto cleanup; @@ -6919,8 +7430,21 @@ handle_move_msgq_off_heap(Process *c_p, if (!msg) msg = c_p->sig_qs.cont; + if (!ohdp->yield) { + endp = sig; + } + else { + ASSERT(!!ohdp->yield->next.in_msgq); + ASSERT(!!ohdp->yield->last.in_msgq); + msg = ohdp->yield->next.sig.common.next; + endp = (ErtsMessage *) &ohdp->yield->last; + remove_yield_marker(c_p, &ohdp->yield->next); + } + + ASSERT(!ohdp->yield || !ohdp->yield->next.in_msgq); + nmsgs = 0; - while (msg != sig) { + while (msg != endp) { ASSERT(!!msg); nmsgs++; if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) { @@ -6997,6 +7521,18 @@ handle_move_msgq_off_heap(Process *c_p, cnt += h_sz/ERTS_PROC_SIG_ADJ_MSGQ_COPY_FACTOR; } + if (cnt > limit) { /* yield... */ + ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next; + ASSERT(*nextpp); + if (*nextpp == endp) + break; /* we're at the end; no point yielding... */ + if (!ohdp->yield) + ohdp->yield = create_yield_adj_msgq_data(c_p); + insert_adj_msgq_yield_markers(c_p, ohdp->yield, nextpp, + next_nm_sig, saved_nm_sigs); + return cnt; + } + msg = msg->next; if (!msg) msg = c_p->sig_qs.cont; @@ -7008,13 +7544,15 @@ done: cleanup: + destroy_adj_msgq_yield_markers(c_p, &ohdp->yield); sig->next = NULL; + sig->data.attached = ERTS_MSG_COMBINED_HFRAG; erts_cleanup_messages(sig); c_p->sig_qs.flags &= ~FS_OFF_HEAP_MSGQ_CHNG; - if (cnt > CONTEXT_REDS) - return CONTEXT_REDS; + if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR) + return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR; return cnt; } @@ -7242,7 +7780,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip) + ErtsMessageInfo *mip, + Sint *msgq_len_p) { Uint tot_heap_size; ErtsMessage *mp, **mpp; @@ -7316,7 +7855,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, mp = mp->next; } - ASSERT(c_p->sig_qs.len == i); + + ASSERT(info_on_self || c_p->sig_qs.len == i); + ASSERT(!info_on_self || c_p->sig_qs.len >= i); + + *msgq_len_p = i; return tot_heap_size; } @@ -7405,7 +7948,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1) BIF_RET(am_normal); /* will handle signals itself... */ } else { - erts_aint32_t state; int done; Eterm res = am_false; int reds = 0; @@ -7451,9 +7993,13 @@ erts_proc_sig_cleanup_queues(Process *c_p) while (sig) { ErtsMessage *free_sig = sig; sig = sig->next; - if (ERTS_SIG_IS_RECV_MARKER(free_sig)) - recv_marker_deallocate(c_p, (ErtsRecvMarker *) free_sig); + if (ERTS_SIG_IS_RECV_MARKER(free_sig)) { + ErtsRecvMarker *recv_mark = (ErtsRecvMarker *) free_sig; + ASSERT(!recv_mark->is_yield_mark); + recv_marker_deallocate(c_p, recv_mark); + } else { + ASSERT(ERTS_SIG_IS_MSG(free_sig)); free_sig->next = NULL; erts_cleanup_messages(free_sig); } @@ -7473,7 +8019,7 @@ erts_proc_sig_cleanup_queues(Process *c_p) /* Debug */ -static void +static erts_aint32_t wait_handle_signals(Process *c_p) { /* @@ -7523,6 +8069,8 @@ wait_handle_signals(Process *c_p) c_p->sig_qs.flags &= ~FS_WAIT_HANDLE_SIGS; c_p->sig_qs.flags |= FS_HANDLING_SIGS; + + return erts_atomic32_read_mb(&c_p->state); } static void @@ -7649,9 +8197,6 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, break; case ERTS_SIG_Q_OP_ADJ_MSGQ: - if (type == ERTS_SIG_Q_TYPE_OFF_HEAP) - break; - /* Fall through... */ case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); break; @@ -7704,6 +8249,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_PROCESS_INFO: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK: + case ERTS_SIG_Q_OP_FLUSH: break; default: @@ -7993,7 +8539,10 @@ proc_sig_hdbg_check_queue(Process *proc, if (sig_psflg != ERTS_PSFLG_FREE) { erts_aint32_t state = erts_atomic32_read_nob(&proc->state); - ERTS_ASSERT(nm_sigs ? !!(state & sig_psflg) : !(state & sig_psflg)); + ERTS_ASSERT(nm_sigs + ? !!(state & sig_psflg) + : (!(state & sig_psflg) + || !!erts_atomic_read_nob(&proc->sig_inq_buffers))); } return msg_len; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index b3aefff0b0..8faa79507f 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -110,7 +110,7 @@ typedef struct { * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 18 +#define ERTS_SIG_Q_OP_MAX 19 #define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */ #define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/ @@ -130,7 +130,8 @@ typedef struct { #define ERTS_SIG_Q_OP_ALIAS_MSG 15 #define ERTS_SIG_Q_OP_RECV_MARK 16 #define ERTS_SIG_Q_OP_UNLINK_ACK 17 -#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_ADJ_MSGQ 18 +#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10) @@ -1133,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to); * * @param[out] statep Pointer to process state after * signal handling. May not be NULL. + * The state should recently have + * been updated before calling + * this function. * * @param[in,out] redsp Pointer to an integer containing * reductions. On input, the amount @@ -1253,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls, int neg_o_reds, ErtsMessage **msgpp, int *get_outp); +/* + * CLEAN_SIGQ - Flush until middle queue is empty, i.e. + * the content of inner+middle queue equals + * the message queue. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0) +/* + * FROM_ALL - Flush signals from all local senders (processes + * and ports). + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1) +/* + * FROM_ID - Flush signals from process or port identified + * by 'id'. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2) + +/* + * All erts_proc_sig_init_flush_signals() flags. + */ +#define ERTS_PROC_SIG_FLUSH_FLGS \ + (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + +/** + * + * @brief Initialize flush of signals from another process or port + * + * Inserts a flush signal in the outer signal queue of + * current process and sets the FS_FLUSHING_SIGS flag in + * 'c_p->sig_qs.flags'. When the flush signal has been + * handled the FS_FLUSHED_SIGS flag is set as well. + * + * While the flushing is ongoing the process *should* only + * handle incoming signals and not execute Erlang code. When + * the functionality that initiated the flush detects that + * the flush is done by the FS_FLUSHED_SIGS flag being set, + * it should clear both the FS_FLUSHED_SIGS flag and the + * FS_FLUSHING_SIGS flag. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * flags Flags indicating how to flush. + * (see above). + * from Identifier of sender to flush + * signals from in case the + * FROM_ID flag is set. + */ +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from); + /** * * @brief Fetch signals from the outer queue @@ -1370,12 +1426,16 @@ typedef struct { * * @param[in] mip Pointer to array of * ErtsMessageInfo structures. + * + * @param[out] msgq_lenp Pointer to integer containing + * amount of messages. */ Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip); + ErtsMessageInfo *mip, + Sint *msgq_lenp); /** * @@ -1676,7 +1736,7 @@ Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id); void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id); ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp); -void erts_msgq_remove_leading_recv_markers(Process *c_p); +void erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p); #define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \ ((int) ((MRKP) - &(BLKP)->marker[0])) @@ -1725,6 +1785,10 @@ erts_proc_sig_fetch(Process *proc) == (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_MSGQ))); + ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS) + || ERTS_PROC_IS_EXITING(proc) + || ERTS_IS_CRASH_DUMPING); + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); @@ -2048,8 +2112,9 @@ erts_msgq_set_save_first(Process *c_p) * anymore... */ if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)) - erts_msgq_remove_leading_recv_markers(c_p); - c_p->sig_qs.save = &c_p->sig_qs.first; + erts_msgq_remove_leading_recv_markers_set_save_first(c_p); + else + c_p->sig_qs.save = &c_p->sig_qs.first; } ERTS_GLB_INLINE void diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index ff81b453a7..00b6d16862 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -6631,10 +6631,12 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, */ /* - * No normal execution until dirty CLA or hibernat has + * No normal execution until dirty CLA or hibernate has * been handled... */ - ASSERT(!(p->flags & (F_DIRTY_CLA | F_DIRTY_GC_HIBERNATE))); + ASSERT(!(p->flags & (F_DIRTY_CHECK_CLA + | F_DIRTY_CLA + | F_DIRTY_GC_HIBERNATE))); a = erts_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_DIRTY_ACTIVE_SYS); @@ -10063,27 +10065,35 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) /* On normal scheduler */ if (state & ERTS_PSFLG_RUNNING_SYS) { if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { - int local_only = (!!(p->sig_qs.flags & FS_LOCAL_SIGS_ONLY) - & !(state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLGS_DIRTY_WORK))); - if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) { - int sig_reds; + int sig_reds; + /* + * If we have dirty work scheduled we allow + * usage of all reductions since we need to + * handle all signals before doing dirty + * work... + * + * If a BIF is flushing signals, we also allow + * usage of all reductions since the BIF cannot + * continue exectution until the flush + * completes... + */ + sig_reds = reds; + if (((state & (ERTS_PSFLGS_DIRTY_WORK + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) + && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { /* - * If we have dirty work scheduled we allow - * usage of all reductions since we need to - * handle all signals before doing dirty - * work... + * We are active, i.e., have erlang work to do, + * and have no dirty work and are not flushing + * limit amount of signal handling work... */ - if (state & ERTS_PSFLGS_DIRTY_WORK) - sig_reds = reds; - else - sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; - (void) erts_proc_sig_handle_incoming(p, - &state, - &sig_reds, - sig_reds, - local_only); - reds -= sig_reds; + sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; } + (void) erts_proc_sig_handle_incoming(p, + &state, + &sig_reds, + sig_reds, + 0); + reds -= sig_reds; } if ((state & (ERTS_PSFLG_SYS_TASKS | ERTS_PSFLG_EXITING)) == ERTS_PSFLG_SYS_TASKS) { @@ -10093,8 +10103,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) * hand written beam assembly in * prim_eval:'receive'. If GC is delayed we are * not allowed to execute system tasks. + * + * We also don't allow execution of system tasks + * if a BIF is flushing signals, since there are + * system tasks that might need to fetch from the + * outer signal queue... */ - if (!(p->flags & F_DELAY_GC)) { + if (!(p->flags & F_DELAY_GC) + && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { int cost = execute_sys_tasks(p, &state, reds); calls += cost; reds -= cost; @@ -10591,25 +10607,49 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) int fcalls; int cla_reds = 0; - if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) - fcalls = reds; - else - fcalls = reds - CONTEXT_REDS; - st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls); - reds -= cla_reds; - if (is_non_value(st_res)) { - if (c_p->flags & F_DIRTY_CLA) { - save_dirty_task(c_p, st); - st = NULL; - break; - } - /* Needed gc, but gc was disabled */ - save_gc_task(c_p, st, st_prio); - st = NULL; - break; - } - /* We did a major gc */ - minor_gc = major_gc = 1; + if (st->arg[0] == am_true) { + /* + * Check if copy literal area GC is needed and only + * do GC if needed. This check is never requested unless + * we know that this is to much work to do on a normal + * scheduler, so we do not even try to check it here + * but instead unconditionally schedule this as dirty + * work... + */ + if (c_p->flags & F_DISABLE_GC) { + /* We might need to GC, but GC was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + } + else { + c_p->flags |= F_DIRTY_CHECK_CLA; + save_dirty_task(c_p, st); + st = NULL; + erts_schedule_dirty_sys_execution(c_p); + } + } + else { + /* Copy literal area GC needed... */ + if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) + fcalls = reds; + else + fcalls = reds - CONTEXT_REDS; + st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls); + reds -= cla_reds; + if (is_non_value(st_res)) { + if (c_p->flags & F_DIRTY_CLA) { + save_dirty_task(c_p, st); + st = NULL; + break; + } + /* Needed gc, but gc was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + break; + } + /* We did a major gc */ + minor_gc = major_gc = 1; + } break; } case ERTS_PSTT_FTMQ: @@ -10622,15 +10662,19 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) break; case ERTS_PSTT_PRIO_SIG: { erts_aint32_t fail_state, state; - int sig_res, sig_reds = reds; + int sig_res, sig_reds; st_res = am_false; + ASSERT(!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)); + if (st->arg[0] == am_false) { erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + st->arg[0] = am_true; } + state = erts_atomic32_read_nob(&c_p->state); sig_reds = reds; sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds, reds, !0); @@ -10644,8 +10688,6 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) if (sig_res) break; - st->arg[0] = am_true; - fail_state = ERTS_PSFLG_EXITING; if (schedule_process_sys_task(c_p, st_prio, st, &fail_state)) { @@ -10656,6 +10698,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) state = erts_atomic32_read_nob(&c_p->state); exit_permanent_prio_elevation(c_p, state, st_prio); } + break; } case ERTS_PSTT_TEST: @@ -10811,10 +10854,12 @@ erts_execute_dirty_system_task(Process *c_p) /* * If multiple operations, perform them in the following * order (in order to avoid unnecessary GC): - * 1. Copy Literal Area (implies major GC). - * 2. GC Hibernate (implies major GC if not woken). - * 3. Major GC (implies minor GC). - * 4. Minor GC. + * 1. Check for Copy Literals Area GC need. This may + * trigger a Copy Literals Area GC. + * 2. Copy Literal Area GC (implies major GC). + * 3. GC Hibernate (implies major GC if not woken). + * 4. Major GC (implies minor GC). + * 5. Minor GC. * * System task requests are handled after the actual * operations have been performed... @@ -10822,6 +10867,37 @@ erts_execute_dirty_system_task(Process *c_p) ASSERT(!(c_p->flags & (F_DELAY_GC|F_DISABLE_GC))); + if (c_p->flags & F_DIRTY_CHECK_CLA) { + ErtsLiteralArea *la = ERTS_COPY_LITERAL_AREA(); + + ASSERT(!(c_p->flags & F_DIRTY_CLA)); + c_p->flags &= ~F_DIRTY_CHECK_CLA; + if (!la) + cla_res = am_ok; + else { + int check_cla_reds = 0; + char *literals = (char *) &la->start[0]; + Uint lit_bsize = (char *) la->end - literals; + if (erts_check_copy_literals_gc_need(c_p, + &check_cla_reds, + literals, + lit_bsize)) { + /* + * We had references to this literal area on the heap; + * need a copy literals GC... + */ + c_p->flags |= F_DIRTY_CLA; + } + else { + /* + * We have no references to this literal area on the + * heap; no copy literals GC needed... + */ + cla_res = am_ok; + } + } + } + if (c_p->flags & F_DIRTY_CLA) { int cla_reds = 0; cla_res = erts_copy_literals_gc(c_p, &cla_reds, c_p->fcalls); @@ -10850,7 +10926,8 @@ erts_execute_dirty_system_task(Process *c_p) c_p->arity, c_p->fcalls); } - ASSERT(!(c_p->flags & (F_DIRTY_CLA + ASSERT(!(c_p->flags & (F_DIRTY_CHECK_CLA + | F_DIRTY_CLA | F_DIRTY_GC_HIBERNATE | F_DIRTY_MAJOR_GC | F_DIRTY_MINOR_GC))); @@ -11234,7 +11311,7 @@ erts_internal_request_system_task_4(BIF_ALIST_4) } void -erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id) +erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check) { Process *rp; ErtsProcSysTask *st; @@ -11260,7 +11337,8 @@ erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id) req_id_sz, &hp, &st->off_heap); - for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[0] = check ? am_true : am_false; + for (i = 1; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) st->arg[i] = THE_NON_VALUE; rp = erts_proc_lookup_raw(to); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 81958e373a..8dc55a8da0 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1246,8 +1246,9 @@ void erts_check_for_holes(Process* p); process table. Always ACTIVE while EXITING. Never SUSPENDED unless also FREE. */ #define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5) -/* UNUSED */ -#define ERTS_PSFLG_UNUSED ERTS_PSFLG_BIT(6) +/* MAYBE_SELF_SIGS - We might have outstanding signals + from ourselves to ourselvs. */ +#define ERTS_PSFLG_MAYBE_SELF_SIGS ERTS_PSFLG_BIT(6) /* ACTIVE - Process "wants" to execute */ #define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7) /* IN_RUNQ - Real process (not proxy) struct used in a @@ -1567,15 +1568,18 @@ extern int erts_system_profile_ts_type; #define F_TRAP_EXIT (1 << 22) /* Trapping exit */ #define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */ #define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */ +#define F_DIRTY_CHECK_CLA (1 << 25) /* Check if copy literal area GC scheduled */ /* Signal queue flags */ #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ #define FS_ON_HEAP_MSGQ (1 << 1) /* On heap msg queue */ #define FS_OFF_HEAP_MSGQ_CHNG (1 << 2) /* Off heap msg queue changing */ -#define FS_LOCAL_SIGS_ONLY (1 << 3) /* Handle privq sigs only */ +#define FS_UNUSED (1 << 3) /* Unused */ #define FS_HANDLING_SIGS (1 << 4) /* Process is handling signals */ #define FS_WAIT_HANDLE_SIGS (1 << 5) /* Process is waiting to handle signals */ #define FS_DELAYED_PSIGQS_LEN (1 << 6) /* Delayed update of sig_qs.len */ +#define FS_FLUSHING_SIGS (1 << 7) /* Currently flushing signals */ +#define FS_FLUSHED_SIGS (1 << 8) /* Flushing of signals completed */ /* * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent @@ -1931,7 +1935,7 @@ void erts_schedule_thr_prgr_later_cleanup_op(void (*)(void *), ErtsThrPrgrLaterOp *, UWord); void erts_schedule_complete_off_heap_message_queue_change(Eterm pid); -void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id); +void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check); struct db_fixation; void erts_schedule_ets_free_fixation(Eterm pid, struct db_fixation*); void erts_schedule_flush_trace_messages(Process *proc, int force_on_proc); diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index f95c7ad1d7..9fa5969b5e 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -159,9 +159,12 @@ Uint erts_process_memory(Process *p, int include_sigs_in_transit) * Size of message queue plus size of all signals * in transit to the process! */ - erts_proc_sig_queue_lock(p); - erts_proc_sig_fetch(p); - erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) + || ERTS_IS_CRASH_DUMPING) { + erts_proc_sig_queue_lock(p); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } ERTS_FOREACH_SIG_PRIVQS( p, mp, @@ -228,7 +231,9 @@ dump_process_info(fmtfn_t to, void *to_arg, Process *p) if (ERTS_TRACE_FLAGS(p) & F_SENSITIVE) return; - erts_proc_sig_fetch(p); + if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) || ERTS_IS_CRASH_DUMPING) { + erts_proc_sig_fetch(p); + } if (p->sig_qs.first || p->sig_qs.cont) { erts_print(to, to_arg, "=proc_messages:%T\n", p->common.id); @@ -1123,8 +1128,8 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg) erts_print(to, to_arg, "FREE"); break; case ERTS_PSFLG_EXITING: erts_print(to, to_arg, "EXITING"); break; - case ERTS_PSFLG_UNUSED: - erts_print(to, to_arg, "UNUSED"); break; + case ERTS_PSFLG_MAYBE_SELF_SIGS: + erts_print(to, to_arg, "MAYBE_SELF_SIGS"); break; case ERTS_PSFLG_ACTIVE: erts_print(to, to_arg, "ACTIVE"); break; case ERTS_PSFLG_IN_RUNQ: diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index f892d0dd89..f54596cf58 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -957,6 +957,7 @@ Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2); Eterm erts_check_process_code(Process *c_p, Eterm module, int *redsp, int fcalls); #define ERTS_CLA_SCAN_WORDS_PER_RED 512 +int erts_check_copy_literals_gc_need_max_reds(Process *c_p); int erts_check_copy_literals_gc_need(Process *c_p, int *redsp, char *literals, Uint lit_bsize); Eterm erts_copy_literals_gc(Process *c_p, int *redsp, int fcalls); diff --git a/erts/emulator/test/bif_SUITE.erl b/erts/emulator/test/bif_SUITE.erl index c5be79528b..ee6c494145 100644 --- a/erts/emulator/test/bif_SUITE.erl +++ b/erts/emulator/test/bif_SUITE.erl @@ -37,6 +37,7 @@ error_stacktrace_during_call_trace/1, group_leader_prio/1, group_leader_prio_dirty/1, is_process_alive/1, + is_process_alive_signal_from/1, process_info_blast/1, os_env_case_sensitivity/1, verify_middle_queue_save/1, @@ -57,7 +58,8 @@ all() -> erl_crash_dump_bytes, min_max, erlang_halt, is_builtin, error_stacktrace, error_stacktrace_during_call_trace, group_leader_prio, group_leader_prio_dirty, - is_process_alive, process_info_blast, os_env_case_sensitivity, + is_process_alive, is_process_alive_signal_from, + process_info_blast, os_env_case_sensitivity, verify_middle_queue_save, test_length,fixed_apply_badarg, external_fun_apply3]. @@ -1205,6 +1207,51 @@ is_process_alive(Config) when is_list(Config) -> Ps), ok. +is_process_alive_signal_from(Config) when is_list(Config) -> + process_flag(priority, high), + process_flag(scheduler, 1), + Schdlr = case erlang:system_info(schedulers_online) of + 1 -> 1; + _ -> 2 + end, + X = is_process_alive_signal_from_test(100000, 0, Schdlr), + erlang:display({exits_detected, X}), + {comment, integer_to_list(X) ++ " exited processes detected"}. + +is_process_alive_signal_from_test(0, X, _Schdlr) -> + X; +is_process_alive_signal_from_test(N, X, Schdlr) -> + Tester = self(), + {Testee, TMon} = spawn_opt(fun () -> + Mon = erlang:monitor(process, Tester), + Tester ! {self(), ready}, + busy_wait_go(), + _ = erlang:demonitor(Mon), + exit(normal) + end, + [link, + monitor, + {priority, high}, + {scheduler, Schdlr}]), + receive {Testee, ready} -> ok end, + {monitored_by, MBList1} = process_info(self(), monitored_by), + true = lists:member(Testee, MBList1), + erlang:yield(), + Testee ! {go, ok}, + erlang:yield(), + NewX = case erlang:is_process_alive(Testee) of + true -> + X; + false -> + %% Demonitor signal should have reached us before the + %% is-process-alive reply... + {monitored_by, MBList2} = process_info(self(), monitored_by), + false = lists:member(Testee, MBList2), + X+1 + end, + receive {'DOWN', TMon, process, Testee, normal} -> ok end, + is_process_alive_signal_from_test(N-1, NewX, Schdlr). + process_info_blast(Config) when is_list(Config) -> Tester = self(), NoAttackers = 1000, @@ -1396,7 +1443,16 @@ external_fun_apply3(_Config) -> ok. %% helpers - + +busy_wait_go() -> + receive + {go, Info} -> + Info + after + 0 -> + busy_wait_go() + end. + id(I) -> I. %% Get code path, including the path for the erts application. diff --git a/erts/emulator/test/literal_area_collector_test.erl b/erts/emulator/test/literal_area_collector_test.erl index 14583db88d..d0b6fb3822 100644 --- a/erts/emulator/test/literal_area_collector_test.erl +++ b/erts/emulator/test/literal_area_collector_test.erl @@ -28,38 +28,29 @@ check_idle(Timeout) when is_integer(Timeout) > 0 -> ScaledTimeout = Timeout*test_server:timetrap_scale_factor(), Pid = find_literal_area_collector(), Start = erlang:monotonic_time(millisecond), - try - wait_for_idle_literal_collector(Pid, Start, ScaledTimeout, -1, 0) - catch - throw:done -> - ok - end. + Alias = alias(), + wait_for_idle_literal_collector(Pid, Alias, Start, ScaledTimeout). -wait_for_idle_literal_collector(Pid, Start, Timeout, NWaiting, WRedsStart) -> - {W, R} = case process_info(Pid, [status, reductions]) of - [{status, waiting}, {reductions, Reds}] -> - %% Assume that reds aren't bumped more than - %% 2 in order to service this process info - %% request... - case {NWaiting > 100, Reds - WRedsStart =< 2*NWaiting} of - {true, true} -> - throw(done); - {false, true} -> - {NWaiting+1, WRedsStart}; - _ -> - {0, Reds} - end; - _ -> - {-1, 0} - end, +wait_for_idle_literal_collector(Pid, Alias, Start, Timeout) -> + Ref = make_ref(), + Pid ! {get_status, Ref, Alias}, Now = erlang:monotonic_time(millisecond), - if Now - Start > Timeout -> - error({busy_literal_area_collecor_timout, Timeout}); - true -> - ok - end, - receive after 1 -> ok end, - wait_for_idle_literal_collector(Pid, Start, Timeout, W, R). + TMO = case Start + Timeout - Now of + TimeLeft when TimeLeft < 0 -> 0; + TimeLeft -> TimeLeft + end, + receive + {Ref, idle} -> + unalias(Alias), + ok; + {Ref, _} -> + receive after 10 -> ok end, + wait_for_idle_literal_collector(Pid, Alias, Start, Timeout) + after TMO -> + unalias(Alias), + receive {Ref, _} -> ok after 0 -> ok end, + error({busy_literal_area_collecor_timout, Timeout}) + end. find_literal_area_collector() -> case get('__literal_area_collector__') of diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl index 4aa9cf6b9f..2f28b4a0e9 100644 --- a/erts/emulator/test/nif_SUITE.erl +++ b/erts/emulator/test/nif_SUITE.erl @@ -1152,6 +1152,7 @@ monitor_process_c(Config) -> Pid = spawn_link(fun() -> R_ptr = alloc_monitor_resource_nif(), {0,Mon} = monitor_process_nif(R_ptr, self(), true, Papa), + receive after 1000 -> ok end, [R_ptr] = monitored_by(self()), put(store, make_resource(R_ptr)), ok = release_resource(R_ptr), @@ -1159,8 +1160,8 @@ monitor_process_c(Config) -> Papa ! {self(), done, R_ptr, Mon}, exit end), - [{Pid, done, R_ptr, Mon1}, - {monitor_resource_down, R_ptr, Pid, Mon2}] = flush(2), + receive {Pid, done, R_ptr, Mon1} -> ok end, + [{monitor_resource_down, R_ptr, Pid, Mon2}] = flush(1), compare_monitors_nif(Mon1, Mon2), {R_ptr, _, 1} = last_resource_dtor_call(), ok. diff --git a/erts/emulator/test/port_SUITE.erl b/erts/emulator/test/port_SUITE.erl index 4a3ecef397..1fa6922648 100644 --- a/erts/emulator/test/port_SUITE.erl +++ b/erts/emulator/test/port_SUITE.erl @@ -1915,6 +1915,7 @@ otp_5112(Config) when is_list(Config) -> true = lists:member(Port, Links1), Port ! {self(), {command, ""}}, wait_until(fun () -> lists:member(Port, erlang:ports()) == false end), + receive after 1000 -> ok end, %% Give signal some time to propagate... {links, Links2} = process_info(self(),links), io:format("Links2: ~p~n",[Links2]), false = lists:member(Port, Links2), %% This used to fail diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index 0e549d424c..23a208adb2 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -49,6 +49,10 @@ process_info_smoke_all/1, process_info_status_handled_signal/1, process_info_reductions/1, + process_info_self_signal/1, + process_info_self_msgq_len/1, + process_info_self_msgq_len_messages/1, + process_info_self_msgq_len_more/1, bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1, otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1, garbage_collect/1, otp_6237/1, @@ -107,21 +111,8 @@ suite() -> all() -> [spawn_with_binaries, t_exit_1, {group, t_exit_2}, trap_exit_badarg, trap_exit_badarg_in_bif, - t_process_info, process_info_other, process_info_other_msg, - process_info_other_dist_msg, process_info_other_status, - process_info_2_list, - process_info_lock_reschedule, - process_info_lock_reschedule2, - process_info_lock_reschedule3, - process_info_other_message_queue_len_signal_race, - process_info_garbage_collection, - process_info_parent, - process_info_smoke_all, - process_info_status_handled_signal, - process_info_reductions, bump_reductions, low_prio, yield, yield2, otp_4725, - dist_unlink_ack_exit_leak, - bad_register, garbage_collect, process_info_messages, + dist_unlink_ack_exit_leak, bad_register, garbage_collect, process_flag_badarg, process_flag_fullsweep_after, process_flag_heap_size, command_line_max_heap_size, @@ -129,6 +120,7 @@ all() -> spawn_huge_arglist, otp_6237, {group, spawn_request}, + {group, process_info_bif}, {group, processes_bif}, {group, otp_7738}, garb_other_running, {group, system_task}, @@ -159,6 +151,24 @@ groups() -> processes_small_tab, processes_this_tab, processes_last_call_trap, processes_apply_trap, processes_gc_trap, processes_term_proc_list]}, + {process_info_bif, [], + [t_process_info, process_info_messages, + process_info_other, process_info_other_msg, + process_info_other_message_queue_len_signal_race, + process_info_other_dist_msg, process_info_other_status, + process_info_2_list, + process_info_lock_reschedule, + process_info_lock_reschedule2, + process_info_lock_reschedule3, + process_info_garbage_collection, + process_info_parent, + process_info_smoke_all, + process_info_status_handled_signal, + process_info_reductions, + process_info_self_signal, + process_info_self_msgq_len, + process_info_self_msgq_len_messages, + process_info_self_msgq_len_more]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, otp_7738_resume]}, @@ -1351,6 +1361,146 @@ pi_reductions_main_unlocker_loop(Other) -> erlang:yield(), pi_reductions_main_unlocker_loop(Other). +process_info_self_signal(Config) when is_list(Config) -> + %% Test that signals that we have sent to ourselves are + %% visible in process_info() result. This is not strictly + %% a necessary property, but implemented so now. See + %% process_info.c:process_info_bif() for more info. + Self = self(), + Ref = make_ref(), + pi_sig_spam_test(fun () -> + process_info_self_signal_spammer(Self) + end, + fun () -> + self() ! Ref, + process_info(self(), messages) + end, + fun (Res) -> + {messages, [Ref]} = Res + end). + +process_info_self_signal_spammer(To) -> + erlang:demonitor(erlang:monitor(process, To)), + process_info_self_signal_spammer(To). + +process_info_self_msgq_len(Config) when is_list(Config) -> + %% Spam ourselves with signals forcing us to flush own + %% signal queue.. + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun () -> + process_info(self(), message_queue_len) + end, + fun (Res) -> + {message_queue_len, Len} = Res, + true = Len > 0, + ok + end). + + +process_info_self_msgq_len_messages(Config) when is_list(Config) -> + %% Spam ourselves with signals normally forcing us to flush own + %% signal queue, but since we also want messages wont be flushed... + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self, 100000) + end, + fun () -> + process_info(self(), + [message_queue_len, + messages]) + end, + fun (Res) -> + [{message_queue_len, Len}, + {messages, Msgs}] = Res, + Len = length(Msgs), + ok + end). + +process_info_self_msgq_len_more(Config) when is_list(Config) -> + self() ! hej, + BodyRes = process_info_self_msgq_len_more_caller_body(), + ok = process_info_self_msgq_len_more_caller_body_result(BodyRes), + TailRes = process_info_self_msgq_len_more_caller_tail(), + ok = process_info_self_msgq_len_more_caller_tail_result(TailRes), + receive hej -> ok end, + %% Check that current_function, current_location, and + %% current_stacktrace give sane results flushing or not... + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun process_info_self_msgq_len_more_caller_body/0, + fun process_info_self_msgq_len_more_caller_body_result/1), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun process_info_self_msgq_len_more_caller_tail/0, + fun process_info_self_msgq_len_more_caller_tail_result/1). + +process_info_self_msgq_len_more_caller_body() -> + Res = process_info(self(), + [message_queue_len, + current_function, + current_location, + current_stacktrace]), + id(Res). + +process_info_self_msgq_len_more_caller_body_result(Res) -> + [{message_queue_len, Len}, + {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_body,0}}, + {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_body,0,_}}, + {current_stacktrace, + [{process_SUITE,process_info_self_msgq_len_more_caller_body,0,_} | _]}] = Res, + true = Len > 0, + ok. + +process_info_self_msgq_len_more_caller_tail() -> + process_info(self(), + [message_queue_len, + current_function, + current_location, + current_stacktrace]). + +process_info_self_msgq_len_more_caller_tail_result(Res) -> + [{message_queue_len, Len}, + {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0}}, + {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_}}, + {current_stacktrace, + [{process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_} | _]}] = Res, + true = Len > 0, + ok. + + +process_info_self_msgq_len_spammer(To) -> + process_info_self_msgq_len_spammer(To, 10000000). + +process_info_self_msgq_len_spammer(_To, 0) -> + ok; +process_info_self_msgq_len_spammer(To, N) -> + To ! hejhopp, + erlang:demonitor(erlang:monitor(process, To)), + process_info_self_msgq_len_spammer(To, N-1). + +pi_sig_spam_test(SpamFun, PITest, PICheckRes) -> + SO = erlang:system_flag(schedulers_online, 1), + try + Self = self(), + SigSpammer = spawn_link(SpamFun), + process_flag(priority, low), + receive after 10 -> ok end, + Res = PITest(), + process_flag(priority, high), + unlink(SigSpammer), + exit(SigSpammer, kill), + false = is_process_alive(SigSpammer), + PICheckRes(Res) + after + _ = erlang:system_flag(schedulers_online, SO) + end. + %% Tests erlang:bump_reductions/1. bump_reductions(Config) when is_list(Config) -> diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl index e1fcd3fbd4..ec2cbfb80e 100644 --- a/erts/emulator/test/signal_SUITE.erl +++ b/erts/emulator/test/signal_SUITE.erl @@ -32,6 +32,7 @@ -include_lib("common_test/include/ct.hrl"). -export([all/0, suite/0,init_per_suite/1, end_per_suite/1]). -export([init_per_testcase/2, end_per_testcase/2]). +-export([groups/0, init_per_group/2, end_per_group/2]). % Test cases -export([xm_sig_order/1, @@ -46,7 +47,19 @@ monitor_order/1, monitor_named_order_local/1, monitor_named_order_remote/1, - monitor_nodes_order/1]). + monitor_nodes_order/1, + move_msgs_off_heap_signal_basic/1, + move_msgs_off_heap_signal_recv/1, + move_msgs_off_heap_signal_exit/1, + move_msgs_off_heap_signal_recv_exit/1, + copy_literal_area_signal_basic/1, + copy_literal_area_signal_recv/1, + copy_literal_area_signal_exit/1, + copy_literal_area_signal_recv_exit/1, + simultaneous_signals_basic/1, + simultaneous_signals_recv/1, + simultaneous_signals_exit/1, + simultaneous_signals_recv_exit/1]). -export([spawn_spammers/3]). @@ -79,7 +92,29 @@ all() -> monitor_order, monitor_named_order_local, monitor_named_order_remote, - monitor_nodes_order]. + monitor_nodes_order, + {group, adjust_message_queue}]. + +groups() -> + [{adjust_message_queue, [], + [move_msgs_off_heap_signal_basic, + move_msgs_off_heap_signal_recv, + move_msgs_off_heap_signal_exit, + move_msgs_off_heap_signal_recv_exit, + copy_literal_area_signal_basic, + copy_literal_area_signal_recv, + copy_literal_area_signal_exit, + copy_literal_area_signal_recv_exit, + simultaneous_signals_basic, + simultaneous_signals_recv, + simultaneous_signals_exit, + simultaneous_signals_recv_exit]}]. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. %% Test that exit signals and messages are received in correct order xm_sig_order(Config) when is_list(Config) -> @@ -706,6 +741,281 @@ receive_filter_spam() -> M -> M end. +move_msgs_off_heap_signal_basic(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(false, false). + +move_msgs_off_heap_signal_recv(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(true, false). + +move_msgs_off_heap_signal_exit(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(false, true). + +move_msgs_off_heap_signal_recv_exit(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(true, true). + +move_msgs_off_heap_signal_test(RecvPair, Exit) -> + erlang:trace(new_processes, true, [running_procs]), + SFact = test_server:timetrap_scale_factor(), + GoTime = erlang:monotonic_time(millisecond) + 1000*SFact, + ProcF = fun () -> + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + on_heap = process_flag(message_queue_data, off_heap), + if RecvPair -> receive_integer_pairs(infinity); + true -> receive after infinity -> ok end + end + end, + Ps = lists:map(fun (_) -> + spawn_opt(ProcF, + [link, + {message_queue_data, on_heap}]) + end, lists:seq(1, 100)), + lists:foreach(fun (P) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)) + end, Ps), + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now + 10 of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + if Exit -> + _ = lists:foldl(fun (P, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_P, N) -> + N+1 + end, + 0, + Ps), + ok; + true -> + ok + end, + wait_traced_not_running(1000 + 200*SFact), + erlang:trace(new_processes, false, [running_procs]), + lists:foreach(fun (P) -> + unlink(P), + exit(P, kill) + end, Ps), + lists:foreach(fun (P) -> + false = is_process_alive(P) + end, Ps), + ok. + +copy_literal_area_signal_basic(Config) when is_list(Config) -> + copy_literal_area_signal_test(false, false). + +copy_literal_area_signal_recv(Config) when is_list(Config) -> + copy_literal_area_signal_test(true, false). + +copy_literal_area_signal_exit(Config) when is_list(Config) -> + copy_literal_area_signal_test(false, true). + +copy_literal_area_signal_recv_exit(Config) when is_list(Config) -> + copy_literal_area_signal_test(true, true). + +copy_literal_area_signal_test(RecvPair, Exit) -> + persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()), + Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}), + true = is_reference(Literal), + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + ProcF = fun () -> + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + if RecvPair -> + receive receive_pairs -> ok end, + receive_integer_pairs(0); + true -> + ok + end, + receive check_literal_conversion -> ok end, + receive + Literal -> + %% Should not be a literal anymore... + false = (0 == erts_debug:size_shared(Literal)) + end + end, + PMs = lists:map(fun (_) -> + spawn_opt(ProcF, [link, monitor]) + end, lists:seq(1, 100)), + lists:foreach(fun ({P,_M}) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)), + P ! Literal + end, PMs), + persistent_term:erase({?MODULE, ?FUNCTION_NAME}), + receive after 1 -> ok end, + if RecvPair -> + lists:foreach(fun ({P,_M}) -> + P ! receive_pairs + end, PMs); + true -> + ok + end, + if Exit -> + _ = lists:foldl(fun ({P, _M}, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_PM, N) -> + N+1 + end, + 0, + PMs), + ok; + true -> + ok + end, + literal_area_collector_test:check_idle(), + lists:foreach(fun ({P,_M}) -> + P ! check_literal_conversion + end, PMs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, R} -> + case R of + normal -> ok; + terminated -> ok + end + end + end, PMs), + ok. + +simultaneous_signals_basic(Config) when is_list(Config) -> + simultaneous_signals_test(false, false). + +simultaneous_signals_recv(Config) when is_list(Config) -> + simultaneous_signals_test(true, false). + +simultaneous_signals_exit(Config) when is_list(Config) -> + simultaneous_signals_test(false, true). + +simultaneous_signals_recv_exit(Config) when is_list(Config) -> + simultaneous_signals_test(true, true). + +simultaneous_signals_test(RecvPairs, Exit) -> + erlang:trace(new_processes, true, [running_procs]), + persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()), + Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}), + true = is_reference(Literal), + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + SFact = test_server:timetrap_scale_factor(), + GoTime = erlang:monotonic_time(millisecond) + 1000*SFact, + ProcF = fun () -> + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + on_heap = process_flag(message_queue_data, off_heap), + if RecvPairs -> receive_integer_pairs(0); + true -> ok + end, + receive check_literal_conversion -> ok end, + receive + Literal -> + %% Should not be a literal anymore... + false = (0 == erts_debug:size_shared(Literal)) + end + end, + PMs = lists:map(fun (_) -> + spawn_opt(ProcF, + [link, + monitor, + {message_queue_data, on_heap}]) + end, lists:seq(1, 100)), + lists:foreach(fun ({P,_M}) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)), + P ! Literal + end, PMs), + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now - 5 of % a bit earlier... + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + persistent_term:erase({?MODULE, ?FUNCTION_NAME}), + receive after 10 -> ok end, + if Exit -> + _ = lists:foldl(fun ({P, _M}, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_PM, N) -> + N+1 + end, + 0, + PMs), + ok; + true -> + ok + end, + wait_traced_not_running(1000 + 200*SFact), + erlang:trace(new_processes, false, [running_procs]), + literal_area_collector_test:check_idle(), + lists:foreach(fun ({P,_M}) -> + P ! check_literal_conversion + end, PMs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, R} -> + case R of + normal -> ok; + terminated -> ok + end + end + end, PMs), + ok. + + +wait_traced_not_running(Tmo) -> + receive + {trace,_,What,_} when What == in; + What == out -> + wait_traced_not_running(Tmo) + after + Tmo -> + ok + end. + +receive_integer_pairs(Tmo) -> + receive + [N|N] -> + receive_integer_pairs(Tmo) + after + Tmo -> + ok + end. %% %% -- Internal utils -------------------------------------------------------- diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index fa215618aa..64d8de67df 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -2479,7 +2479,7 @@ define etp-proc-state-int printf "active | " end if ($arg0 & 0x1000) - printf "unused | " + printf "maybe-self-sigs | " end if ($arg0 & 0x800) printf "exiting | " diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam Binary files differindex d54baaf4a2..165bfa4622 100644 --- a/erts/preloaded/ebin/erts_literal_area_collector.beam +++ b/erts/preloaded/ebin/erts_literal_area_collector.beam diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl index 8a73ed1685..bb2ad6b919 100644 --- a/erts/preloaded/src/erts_literal_area_collector.erl +++ b/erts/preloaded/src/erts_literal_area_collector.erl @@ -62,39 +62,46 @@ msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) -> switch_area(); %% Process (_Pid) has completed the request... - {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1, - NeedIReq == [] -> + {copy_literals, {Area, _ReqType, _Pid}, ok} when Ongoing == 1, + NeedIReq == [] -> switch_area(); %% Last process completed... - {copy_literals, {Area, false, _Pid}, ok} -> + {copy_literals, {Area, init, _Pid}, ok} -> msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), GcOutstnd, NeedGC); - {copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] -> + {copy_literals, {Area, ReqType, _Pid}, ok} when NeedGC == [], + ReqType /= init -> msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), GcOutstnd-1, []); - {copy_literals, {Area, true, _Pid}, ok} -> - send_copy_req(hd(NeedGC), Area, true), - msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC)); + {copy_literals, {Area, ReqType, _Pid}, ok} when ReqType /= init -> + [{GCPid,GCWork} | NewNeedGC] = NeedGC, + send_copy_req(GCPid, Area, GCWork), + msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, NewNeedGC); %% Process (Pid) failed to complete the request %% since it needs to garbage collect in order to %% complete the request... - {copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND -> - send_copy_req(Pid, Area, true), + {copy_literals, {Area, init, Pid}, GCWork} when GcOutstnd + < ?MAX_GC_OUTSTND -> + send_copy_req(Pid, Area, GCWork), msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC); - {copy_literals, {Area, false, Pid}, need_gc} -> + {copy_literals, {Area, init, Pid}, GCWork} -> msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq), - GcOutstnd, [Pid|NeedGC]); + GcOutstnd, [{Pid,GCWork} | NeedGC]); %% Not handled message regarding the area that we %% currently are working with. Crash the VM so %% we notice this bug... - {copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) -> + {copy_literals, {Area, _, _}, _} = Msg -> exit({not_handled_message, Msg}); {change_prio, From, Ref, Prio} -> change_prio(From, Ref, Prio), msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + {get_status, Ref, From} when is_pid(From); is_reference(From) -> + From ! {Ref, if Ongoing == 0 -> idle; true -> working end}, + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + %% Unexpected garbage message. Get rid of it... _Ignore -> msg_loop(Area, OReqInfo, GcOutstnd, NeedGC) @@ -126,7 +133,7 @@ switch_area() -> check_send_copy_req(_Area, Ongoing, []) -> {Ongoing, []}; check_send_copy_req(Area, Ongoing, [Pid|Pids]) -> - send_copy_req(Pid, Area, false), + send_copy_req(Pid, Area, init), {Ongoing+1, Pids}. send_copy_reqs(Ps, Area, OReqLim) -> @@ -137,23 +144,23 @@ send_copy_reqs([], _Area, _OReqLim, N) -> send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim -> {N, Ps}; send_copy_reqs([P|Ps], Area, OReqLim, N) -> - send_copy_req(P, Area, false), + send_copy_req(P, Area, init), send_copy_reqs(Ps, Area, OReqLim, N+1). -send_copy_req(P, Area, GC) -> - erts_literal_area_collector:send_copy_request(P, Area, GC). +send_copy_req(P, Area, How) -> + erts_literal_area_collector:send_copy_request(P, Area, How). -spec release_area_switch() -> boolean(). release_area_switch() -> erlang:nif_error(undef). %% Implemented in beam_bif_load.c --spec send_copy_request(To, AreaId, GcAllowed) -> 'ok' when +-spec send_copy_request(To, AreaId, How) -> 'ok' when To :: pid(), AreaId :: term(), - GcAllowed :: boolean(). + How :: 'init' | 'check_gc' | 'need_gc'. -send_copy_request(_To, _AreaId, _GcAllowed) -> +send_copy_request(_To, _AreaId, _How) -> erlang:nif_error(undef). %% Implemented in beam_bif_load.c change_prio(From, Ref, Prio) -> |