diff options
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 61 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 554 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 134 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/global.h | 1 | ||||
-rw-r--r-- | erts/emulator/test/literal_area_collector_test.erl | 51 | ||||
-rw-r--r-- | erts/emulator/test/signal_SUITE.erl | 314 | ||||
-rw-r--r-- | erts/preloaded/ebin/erts_literal_area_collector.beam | bin | 5616 -> 5932 bytes | |||
-rw-r--r-- | erts/preloaded/src/erts_literal_area_collector.erl | 45 |
13 files changed, 994 insertions, 179 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 223603543f..f041227966 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -172,6 +172,7 @@ atom cflags atom CHANGE='CHANGE' atom characters_to_binary_int atom characters_to_list_int +atom check_gc atom clear atom clock_service atom close diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index f910a4e8be..0e33dfa667 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -975,12 +975,39 @@ set_default_trace_pattern(Eterm module) } int -erts_check_copy_literals_gc_need(Process *c_p, int *redsp, - char *literals, Uint lit_bsize) +erts_check_copy_literals_gc_need_max_reds(Process *c_p) { + Uint64 words, reds; + /* - * TODO: Implement yielding support! + * Calculate maximum amount of words that needs + * to be scanned... */ + words = 1; /* fvalue */ + words += c_p->hend - c_p->stop; /* stack */ + words += c_p->htop - c_p->heap; /* new heap */ + if (c_p->abandoned_heap) + words += c_p->heap_sz; /* abandoned heap */ + words += c_p->old_htop - c_p->old_heap; /* old heap */ + if (c_p->dictionary) { + Eterm* start = ERTS_PD_START(c_p->dictionary); + Eterm* end = start + ERTS_PD_SIZE(c_p->dictionary); + + words += end - start; /* dictionary */ + } + words += c_p->mbuf_sz; /* heap and message fragments */ + + /* Convert to reductions... */ + reds = ((words - 1)/ERTS_CLA_SCAN_WORDS_PER_RED) + 1; + if (reds > CONTEXT_REDS) + return CONTEXT_REDS+1; + return (int) reds; +} + +int +erts_check_copy_literals_gc_need(Process *c_p, int *redsp, + char *literals, Uint lit_bsize) +{ ErlHeapFragment *hfrag; ErtsMessage *mfp; Uint64 scanned = 0; @@ -1529,22 +1556,30 @@ erts_literal_area_collector_send_copy_request_3(BIF_ALIST_3) req_id = TUPLE3(&tmp_heap[0], BIF_ARG_2, BIF_ARG_3, BIF_ARG_1); - if (BIF_ARG_3 == am_false) { + switch (BIF_ARG_3) { + + case am_init: /* - * Will handle signal queue and check if GC if needed. If - * GC is needed operation will be continued by a GC (below). + * Will handle signal queue and if possible check if GC if needed. + * If GC is needed or needs to be checked the operation will be + * restarted later in the 'check_gc' or 'need_gc' case below... */ erts_proc_sig_send_cla_request(BIF_P, BIF_ARG_1, req_id); - } - else if (BIF_ARG_3 == am_true) { + break; + + case am_check_gc: + case am_need_gc: /* - * Will perform a literal GC. Note that this assumes that - * signal queue already has been handled... + * Will check and/or perform a literal GC. Note that this assumes that + * signal queue already has been handled by 'init' case above... */ - erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id); - } - else + erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id, + BIF_ARG_3 == am_check_gc); + break; + + default: BIF_ERROR(BIF_P, BADARG); + } BIF_RET(am_ok); } diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4aed5d9b34..24ba019075 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -274,6 +274,7 @@ type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data type RELEASE_LAREA SHORT_LIVED SYSTEM release_literal_area type SIG_DATA SHORT_LIVED PROCESSES signal_data +type SIG_YIELD_DATA SHORT_LIVED PROCESSES signal_yield_data type DIST_DEMONITOR SHORT_LIVED PROCESSES dist_demonitor type CML_CLEANUP SHORT_LIVED SYSTEM connection_ml_cleanup type ML_YIELD_STATE SHORT_LIVED SYSTEM monitor_link_yield_state diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 87cd96f4cd..3a7aba5a68 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -284,6 +284,7 @@ typedef struct { typedef struct { ErtsSignal sig; ErtsMessage **prev_next; + signed char is_yield_mark; signed char pass; signed char set_save; signed char in_sigq; diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index bfddf9e48f..5b66c66c96 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -216,10 +216,25 @@ typedef struct { } ErtsProcSigRPC; typedef struct { + ErtsRecvMarker next; + ErtsRecvMarker last; +} ErtsYieldAdjMsgQ; + +typedef struct { + ErtsYieldAdjMsgQ *yield; Eterm requester; Eterm request_id; } ErtsCLAData; +typedef struct { + ErtsYieldAdjMsgQ *yield; +} ErtsAdjOffHeapMsgQData; + +typedef struct { + ErtsMessage *first; + ErtsMessage **last; +} ErtsSavedNMSignals; + static erts_aint32_t wait_handle_signals(Process *c_p); static void wake_handle_signals(Process *proc); @@ -243,12 +258,17 @@ static int handle_cla(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting); + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs); + static int handle_move_msgq_off_heap(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting); + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs); static void send_cla_reply(Process *c_p, ErtsMessage *sig, Eterm to, Eterm req_id, Eterm result); @@ -293,6 +313,76 @@ proc_sig_hdbg_check_queue(Process *c_p, #define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN) #endif +static void +save_delayed_nm_signal(ErtsSavedNMSignals *saved_sigs, ErtsMessage *sig) +{ + ErtsSignal *nm_sig = (ErtsSignal *) sig; + nm_sig->common.next = NULL; + nm_sig->common.specific.next = NULL; + if (!saved_sigs->first) { + ASSERT(!saved_sigs->last); + saved_sigs->first = sig; + saved_sigs->last = &saved_sigs->first; + } + else { + ErtsSignal *last; + ASSERT(saved_sigs->last); + last = (ErtsSignal *) *saved_sigs->last; + last->common.next = sig; + last->common.specific.next = &last->common.next; + saved_sigs->last = &last->common.next; + } +} + +static erts_aint32_t +restore_delayed_nm_signals(Process *c_p, ErtsSavedNMSignals *saved_sigs) +{ + erts_aint32_t state; + ErtsSignal *lsig; + + ASSERT(saved_sigs->first && saved_sigs->last); + + lsig = (ErtsSignal *) *saved_sigs->last; + if (!c_p->sig_qs.cont) { + ASSERT(!c_p->sig_qs.nmsigs.next); + ASSERT(!c_p->sig_qs.nmsigs.last); + if (saved_sigs->last == &saved_sigs->first) + c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; + else + c_p->sig_qs.nmsigs.last = saved_sigs->last; + c_p->sig_qs.cont_last = &lsig->common.next; + } + else { + lsig->common.next = c_p->sig_qs.cont; + if (c_p->sig_qs.nmsigs.next) { + ASSERT(c_p->sig_qs.nmsigs.last); + if (c_p->sig_qs.nmsigs.next == &c_p->sig_qs.cont) + lsig->common.specific.next = &lsig->common.next; + else + lsig->common.specific.next = c_p->sig_qs.nmsigs.next; + if (c_p->sig_qs.nmsigs.last == &c_p->sig_qs.cont) + c_p->sig_qs.nmsigs.last = &lsig->common.next; + } + else { + ASSERT(!c_p->sig_qs.nmsigs.last); + if (saved_sigs->last == &saved_sigs->first) + c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; + else + c_p->sig_qs.nmsigs.last = saved_sigs->last; + if (c_p->sig_qs.cont_last == &c_p->sig_qs.cont) + c_p->sig_qs.cont_last = &lsig->common.next; + } + } + + c_p->sig_qs.cont = saved_sigs->first; + c_p->sig_qs.nmsigs.next = &c_p->sig_qs.cont; + + state = erts_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_SIG_Q); + state |= ERTS_PSFLG_SIG_Q; + return state; +} + typedef struct { ErtsSignalCommon common; Eterm ref; @@ -389,6 +479,18 @@ get_cla_data(ErtsMessage *sig) + sig->hfrag.used_size); } +static ERTS_INLINE ErtsAdjOffHeapMsgQData * +get_move_msgq_off_heap_data(ErtsMessage *sig) +{ + ASSERT(ERTS_SIG_IS_NON_MSG(sig)); + ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) sig)->common.tag) + == ERTS_SIG_Q_OP_ADJ_MSGQ); + ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) + == ERTS_SIG_Q_TYPE_OFF_HEAP); + return (ErtsAdjOffHeapMsgQData *) (char *) (&sig->hfrag.mem[0] + + sig->hfrag.used_size); +} + static ERTS_INLINE void destroy_trace_info(ErtsSigTraceInfo *ti) { @@ -598,7 +700,7 @@ enqueue_signals(Process *rp, ErtsMessage *first, this = dest_queue->last; - if ( ! is_to_buffer ){ + if (!is_to_buffer) { ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); } @@ -664,7 +766,9 @@ enqueue_signals(Process *rp, ErtsMessage *first, dest_queue->len += num_msgs; - ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + if (!is_to_buffer) { + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); + } return state; } @@ -1468,22 +1572,17 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) Eterm tag = ((ErtsSignal *) sig)->common.tag; /* - * Heap alias message, heap frag alias message and - * adjust message queue signals are the only non-message - * signals, which are allocated as messages, which do not - * use a combined message / heap fragment. + * Heap alias message and heap frag alias message are + * the only non-message signals, which are allocated as + * messages, which do not use a combined message / heap + * fragment. */ - if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag) - || tag == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ, - ERTS_SIG_Q_TYPE_OFF_HEAP, - 0)) { + if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag)) { sig->data.heap_frag = NULL; return; } - - - if(ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) { + if (ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) { /* Retrieve pointer to heap fragment (may not be NULL). */ void *attached; (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached); @@ -1494,10 +1593,45 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) /* * Using a combined heap fragment... */ - ErtsDistExternal *edep = get_external_non_msg_signal(sig); - if (edep) - erts_free_dist_ext_copy(edep); - + switch (ERTS_PROC_SIG_OP(tag)) { + + case ERTS_SIG_Q_OP_ADJ_MSGQ: { + /* We need to deallocate yield markers if such has been used... */ + ErtsYieldAdjMsgQ *yp; + switch (ERTS_PROC_SIG_TYPE(tag)) { + case ERTS_SIG_Q_TYPE_CLA: { + ErtsCLAData *cla = get_cla_data(sig); + yp = cla->yield; + cla->yield = NULL; + break; + } + case ERTS_SIG_Q_TYPE_OFF_HEAP: { + ErtsAdjOffHeapMsgQData *ohdp = get_move_msgq_off_heap_data(sig); + yp = ohdp->yield; + ohdp->yield = NULL; + break; + } + default: + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); + yp = NULL; + break; + } + if (yp) { + ASSERT(!yp->next.in_msgq && !yp->next.in_sigq); + ASSERT(!yp->last.in_msgq && !yp->last.in_sigq); + erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp); + } + break; + } + + default: { + ErtsDistExternal *edep = get_external_non_msg_signal(sig); + if (edep) + erts_free_dist_ext_copy(edep); + break; + } + } + sig->data.attached = ERTS_MSG_COMBINED_HFRAG; hfrag = sig->hfrag.next; erts_cleanup_offheap(&sig->hfrag.off_heap); @@ -2717,6 +2851,7 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) cla = (ErtsCLAData *) (char *) hp; hfrag->used_size = hp - start_hp; + cla->yield = NULL; cla->requester = c_p->common.id; cla->request_id = req_id_cpy; @@ -2738,8 +2873,20 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) void erts_proc_sig_send_move_msgq_off_heap(Eterm to) { - ErtsMessage *sig = erts_alloc_message(0, NULL); + ErtsMessage *sig; + Eterm *hp; + Uint hsz; + ErtsAdjOffHeapMsgQData *ohdp; ASSERT(is_internal_pid(to)); + + hsz = sizeof(ErtsAdjOffHeapMsgQData)/sizeof(Uint); + sig = erts_alloc_message(hsz, &hp); + + ohdp = (ErtsAdjOffHeapMsgQData *) (char *) hp; + ohdp->yield = NULL; + + sig->hfrag.used_size = 0; + ERL_MESSAGE_TERM(sig) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ, ERTS_SIG_Q_TYPE_OFF_HEAP, 0); @@ -3019,7 +3166,7 @@ remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) static ERTS_INLINE void remove_mq_sig(Process *c_p, ErtsMessage *sig, - ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) + ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) { /* * Remove signal from (middle) signal queue. @@ -3247,6 +3394,8 @@ recv_marker_deallocate(Process *c_p, ErtsRecvMarker *markp) ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk; int ix, nix; + ASSERT(!markp->is_yield_mark); + ASSERT(blkp); ERTS_HDBG_CHK_RECV_MRKS(c_p); @@ -3297,6 +3446,7 @@ recv_marker_dequeue(Process *c_p, ErtsRecvMarker *markp) { ErtsMessage *sigp; + ASSERT(!markp->is_yield_mark); ASSERT(markp->proc == c_p); if (markp->in_sigq <= 0) { @@ -3362,6 +3512,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp, /* Allocate marker for 'uniqp' in index zero... */ *ixp = 0; blkp->ref[0] = recv_marker_uniq(c_p, uniqp); + blkp->marker[0].is_yield_mark = 0; markp = &blkp->marker[0]; markp->next_ix = markp->prev_ix = 0; blkp->used_ix = 0; @@ -3377,6 +3528,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp, blkp->free_ix = 1; for (ix = 1; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) { blkp->ref[ix] = am_free; + blkp->marker[ix].is_yield_mark = 0; if (ix == ERTS_RECV_MARKER_BLOCK_SIZE - 1) blkp->marker[ix].next_ix = -1; /* End of list */ else @@ -3644,20 +3796,29 @@ erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id) } void -erts_msgq_remove_leading_recv_markers(Process *c_p) +erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p) { + ErtsMessage **save; /* * Receive markers in the front of the queue does not - * add any value, so we just remove them... + * add any value, so we just remove them. We need to + * keep and pass yield markers though... */ ASSERT(c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); + save = &c_p->sig_qs.first; do { - ErtsRecvMarker *markp = (ErtsRecvMarker *) c_p->sig_qs.first; - recv_marker_dequeue(c_p, markp); - } while (c_p->sig_qs.first - && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)); + ErtsRecvMarker *markp = (ErtsRecvMarker *) *save; + if (markp->is_yield_mark) + save = &markp->sig.common.next; + else + recv_marker_dequeue(c_p, markp); + } while (*save && ERTS_SIG_IS_RECV_MARKER(*save)); + + c_p->sig_qs.save = save; + + ASSERT(!*save || ERTS_SIG_IS_MSG(*save)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); } @@ -3669,7 +3830,8 @@ erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp) ASSERT(ERTS_SIG_IS_RECV_MARKER(sigp)); do { ErtsRecvMarker *markp = (ErtsRecvMarker *) sigp; - if (++markp->pass > ERTS_RECV_MARKER_PASS_MAX) { + if (!markp->is_yield_mark + && ++markp->pass > ERTS_RECV_MARKER_PASS_MAX) { recv_marker_dequeue(c_p, markp); sigp = *sigpp; } @@ -5235,7 +5397,8 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; - + ErtsSavedNMSignals delayed_nm_signals = {0}; + ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS)); if (c_p->sig_qs.flags & FS_HANDLING_SIGS) state = wait_handle_signals(c_p); @@ -5759,21 +5922,43 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; - case ERTS_SIG_Q_OP_ADJ_MSGQ: + case ERTS_SIG_Q_OP_ADJ_MSGQ: { + int adj_limit, adj_cnt, min_adj_limit; + /* + * This may require a substantial amount of work and we + * want to get it over and done with in a reasonable + * amount of time, so we bump up the limit for it a bit... + */ + min_adj_limit = ERTS_SIG_REDS_CNT_FACTOR*CONTEXT_REDS/6; + if (sig->next) + adj_limit = min_adj_limit; + else { + adj_limit = limit - cnt; + if (adj_limit < min_adj_limit) + adj_limit = min_adj_limit; + } ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); switch (ERTS_PROC_SIG_TYPE(tag)) { case ERTS_SIG_Q_TYPE_CLA: - cnt += handle_cla(c_p, sig, next_nm_sig, 0); + adj_cnt = handle_cla(c_p, sig, next_nm_sig, 0, adj_limit, + &delayed_nm_signals); break; case ERTS_SIG_Q_TYPE_OFF_HEAP: - cnt += handle_move_msgq_off_heap(c_p, sig, next_nm_sig, 0); + adj_cnt = handle_move_msgq_off_heap(c_p, sig, next_nm_sig, + 0, adj_limit, + &delayed_nm_signals); break; default: - ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type"); + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); break; } + cnt += adj_cnt; + limit += adj_cnt; + if (limit > abs_lim) + abs_lim = limit; ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + } case ERTS_SIG_Q_OP_FLUSH: ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); @@ -5818,6 +6003,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, case ERTS_SIG_Q_OP_RECV_MARK: { ErtsRecvMarker *markp = (ErtsRecvMarker *) sig; ASSERT(markp->in_sigq); + ASSERT(!markp->is_yield_mark); if (markp->in_sigq < 0) { /* Marked for removal... */ @@ -6000,10 +6186,25 @@ stop: { * We know we do not have any outstanding signals * from ourselves... */ - (void) erts_atomic32_read_band_nob(&c_p->state, - ~ERTS_PSFLG_MAYBE_SELF_SIGS); + state = erts_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_MAYBE_SELF_SIGS); state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS; } + + if (delayed_nm_signals.first) { + /* + * We do this after clearing ERTS_PSFLG_MAYBE_SELF_SIGS + * since there currently are no signals that can be delayed + * that should be counted as originating from the process + * itself. If such signals appear in the future this has to + * be accounted for... + * + * The adjust message queue data "signal" does originate from + * the process itself, but it is not conseptually a signal. + */ + state = restore_delayed_nm_signals(c_p, &delayed_nm_signals); + } + *statep = state; /* Ensure that 'save' doesn't point to a receive marker... */ @@ -6015,7 +6216,7 @@ stop: { ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); - *redsp = cnt/4 + 1; + *redsp = cnt/ERTS_SIG_REDS_CNT_FACTOR + 1; if (yield) { int vreds = max_reds - *redsp; @@ -6277,13 +6478,14 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, case ERTS_SIG_Q_OP_ADJ_MSGQ: switch (ERTS_PROC_SIG_TYPE(tag)) { case ERTS_SIG_Q_TYPE_CLA: - handle_cla(c_p, sig, next_nm_sig, !0); + handle_cla(c_p, sig, next_nm_sig, !0, limit, NULL); break; case ERTS_SIG_Q_TYPE_OFF_HEAP: - handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0); + handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0, + limit, NULL); break; default: - ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type"); + ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type"); break; } break; @@ -6300,6 +6502,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, case ERTS_SIG_Q_OP_RECV_MARK: { ErtsRecvMarker *markp = (ErtsRecvMarker *) sig; + ASSERT(!markp->is_yield_mark); markp->in_msgq = markp->in_sigq = markp->set_save = 0; recv_marker_deallocate(c_p, markp); break; @@ -6477,11 +6680,6 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; case ERTS_SIG_Q_OP_ADJ_MSGQ: - if (type == ERTS_SIG_Q_TYPE_OFF_HEAP) { - size = sizeof(ErtsMessageRef); - break; - } - /* Fall through... */ case ERTS_SIG_Q_OP_SYNC_SUSPEND: case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_IS_ALIVE: @@ -6717,7 +6915,6 @@ erts_proc_sig_receive_helper(Process *c_p, if (left_reds <= 0) break; /* yield */ - ASSERT(!c_p->sig_qs.cont); /* Go fetch again... */ } @@ -6730,6 +6927,127 @@ erts_proc_sig_receive_helper(Process *c_p, return consumed_reds; } +static void +init_yield_marker(Process *c_p, ErtsRecvMarker *mrkp) +{ + mrkp->prev_next = NULL; + mrkp->is_yield_mark = (char) !0; + mrkp->pass = (char) 100; + mrkp->set_save = (char) 0; + mrkp->in_sigq = (char) 0; + mrkp->in_msgq = (char) 0; + mrkp->prev_ix = (char) -100; + mrkp->next_ix = (char) -100; +#ifdef DEBUG + mrkp->used = (char) !0; + mrkp->proc = c_p; +#endif + mrkp->sig.common.next = NULL; + mrkp->sig.common.specific.attachment = NULL; + mrkp->sig.common.tag = ERTS_RECV_MARKER_TAG; +} + +static void +remove_yield_marker(Process *c_p, ErtsRecvMarker *mrkp) +{ + ASSERT(mrkp); + ASSERT(mrkp->is_yield_mark); + ASSERT(mrkp->in_msgq); + remove_iq_sig(c_p, (ErtsMessage *) mrkp, mrkp->prev_next); + mrkp->in_msgq = 0; + mrkp->in_sigq = 0; + mrkp->prev_next = NULL; + mrkp->sig.common.next = NULL; +} + +static ErtsYieldAdjMsgQ * +create_yield_adj_msgq_data(Process *c_p) +{ + ErtsYieldAdjMsgQ *yp = erts_alloc(ERTS_ALC_T_SIG_YIELD_DATA, + sizeof(ErtsYieldAdjMsgQ)); + init_yield_marker(c_p, &yp->next); + init_yield_marker(c_p, &yp->last); + return yp; +} + +static ERTS_INLINE void +insert_adj_msgq_yield_markers(Process *c_p, + ErtsYieldAdjMsgQ *yp, + ErtsMessage **nextpp, + ErtsMessage ***next_nm_sig, + ErtsSavedNMSignals *saved_sigs) +{ + ErtsMessage *sig, *nextp; + + ASSERT(yp); + ASSERT(nextpp); + ASSERT(next_nm_sig && *next_nm_sig && **next_nm_sig); + ASSERT(!yp->next.in_msgq); + + sig = **next_nm_sig; + + ASSERT(ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(sig)) + == ERTS_SIG_Q_OP_ADJ_MSGQ); + + /* + * Insert 'next' yield marker. This is in the inner queue or + * in the beginning of the middle queue where we've already + * begun using 'prev_next' pointers for receive markers, + * so if a receive marker follow, we need to update it. + */ + yp->next.in_msgq = !0; + yp->next.in_sigq = !0; + yp->next.prev_next = nextpp; + yp->next.sig.common.next = nextp = *nextpp; + *nextpp = (ErtsMessage *) &yp->next; + + ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->next); + + if (nextp && ERTS_SIG_IS_RECV_MARKER(nextp)) { + ErtsRecvMarker *next_mrkp = (ErtsRecvMarker *) nextp; + next_mrkp->prev_next = &yp->next.sig.common.next; + } + + if (yp->last.in_msgq) { + remove_nm_sig(c_p, sig, next_nm_sig); + } + else { + /* + * Replace adj-msgq signal with 'last' yield marker. + * + * This is in the middle queue after the point where + * we've begun using 'prev_next' pointers for receive + * markers, so if a receive marker follow, we do not + * need to adjust its 'prev_next'. + */ + ErtsMessage **next_sig = *next_nm_sig; + yp->last.in_msgq = (char) !0; + yp->last.in_sigq = (char) !0; + yp->last.prev_next = next_sig; + *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; + *next_sig = (ErtsMessage *) &yp->last; + remove_mq_sig(c_p, sig, &yp->last.sig.common.next, next_nm_sig); + + ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->last); + } + + save_delayed_nm_signal(saved_sigs, sig); +} + +static ERTS_INLINE void +destroy_adj_msgq_yield_markers(Process *c_p, ErtsYieldAdjMsgQ **ypp) +{ + ErtsYieldAdjMsgQ *yp = *ypp; + if (yp) { + if (yp->next.in_msgq) + remove_yield_marker(c_p, &yp->next); + if (yp->last.in_msgq) + remove_yield_marker(c_p, &yp->last); + erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp); + *ypp = NULL; + } +} + static Uint area_literal_size(Eterm* start, Eterm* end, char* lit_start, Uint lit_size) { @@ -6851,17 +7169,16 @@ static int handle_cla(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting) + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs) { - /* - * TODO: Implement yielding support! - */ ErtsCLAData *cla; - ErtsMessage *msg; + ErtsMessage *msg, *endp; ErtsLiteralArea *la; char *literals; Uint lit_bsize; - int nmsgs, reds; + int nmsgs, reds, stretch_yield_limit = 0; Eterm result = am_ok; Uint64 cnt = 0; @@ -6882,6 +7199,30 @@ handle_cla(Process *c_p, * can therefore occur behind this signal. */ + msg = c_p->sig_qs.first; + if (!msg) + msg = c_p->sig_qs.cont; + + if (!cla->yield) { + endp = sig; + } + else { + if (!cla->yield->next.in_msgq) { + /* All messages already handled... */ + ASSERT(!cla->yield->last.in_msgq); + stretch_yield_limit = !0; + endp = msg = sig; + } + else { + ASSERT(!!cla->yield->last.in_msgq); + msg = cla->yield->next.sig.common.next; + endp = (ErtsMessage *) &cla->yield->last; + remove_yield_marker(c_p, &cla->yield->next); + } + } + + ASSERT(!cla->yield || !cla->yield->next.in_msgq); + la = ERTS_COPY_LITERAL_AREA(); if (!la) { ASSERT(0); @@ -6894,12 +7235,8 @@ handle_cla(Process *c_p, literals = (char *) &la->start[0]; lit_bsize = (char *) la->end - literals; - msg = c_p->sig_qs.first; - if (!msg) - msg = c_p->sig_qs.cont; - nmsgs = 0; - while (msg != sig) { + while (msg != endp) { ASSERT(!!msg); nmsgs++; if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) { @@ -6994,6 +7331,18 @@ handle_cla(Process *c_p, } } + if (cnt > limit) { /* yield... */ + ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next; + ASSERT(*nextpp); + if (*nextpp == endp) + break; /* we're at the end; no point yielding here... */ + if (!cla->yield) + cla->yield = create_yield_adj_msgq_data(c_p); + insert_adj_msgq_yield_markers(c_p, cla->yield, nextpp, + next_nm_sig, saved_nm_sigs); + return cnt; + } + msg = msg->next; if (!msg) msg = c_p->sig_qs.cont; @@ -7001,18 +7350,36 @@ handle_cla(Process *c_p, remove_nm_sig(c_p, sig, next_nm_sig); - reds = 0; - if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize)) - result = am_need_gc; - - cnt += reds * ERTS_SIG_REDS_CNT_FACTOR; + reds = erts_check_copy_literals_gc_need_max_reds(c_p); + cnt++; + if (reds > CONTEXT_REDS) + result = am_check_gc; + else if (stretch_yield_limit + || cnt + reds*ERTS_SIG_REDS_CNT_FACTOR <= limit) { + reds = 0; + if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize)) + result = am_need_gc; + cnt += reds * ERTS_SIG_REDS_CNT_FACTOR; + } + else { + /* yield... */ + if (!cla->yield) + cla->yield = create_yield_adj_msgq_data(c_p); + else if (!!cla->yield->last.in_msgq) + remove_yield_marker(c_p, &cla->yield->last); + ASSERT(!cla->yield->next.in_msgq); + save_delayed_nm_signal(saved_nm_sigs, sig); + return cnt; + } done: + destroy_adj_msgq_yield_markers(c_p, &cla->yield); + send_cla_reply(c_p, sig, cla->requester, cla->request_id, result); - if (cnt > CONTEXT_REDS) - return CONTEXT_REDS; + if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR) + return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR; return cnt; } @@ -7020,12 +7387,12 @@ static int handle_move_msgq_off_heap(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig, - int exiting) + int exiting, + int limit, + ErtsSavedNMSignals *saved_nm_sigs) { - /* - * TODO: Implement yielding support! - */ - ErtsMessage *msg; + ErtsAdjOffHeapMsgQData *ohdp; + ErtsMessage *msg, *endp; int nmsgs; Uint64 cnt = 0; @@ -7045,6 +7412,8 @@ handle_move_msgq_off_heap(Process *c_p, cnt++; + ohdp = get_move_msgq_off_heap_data(sig); + if (exiting) { /* signal already removed from queue... */ goto cleanup; @@ -7065,8 +7434,21 @@ handle_move_msgq_off_heap(Process *c_p, if (!msg) msg = c_p->sig_qs.cont; + if (!ohdp->yield) { + endp = sig; + } + else { + ASSERT(!!ohdp->yield->next.in_msgq); + ASSERT(!!ohdp->yield->last.in_msgq); + msg = ohdp->yield->next.sig.common.next; + endp = (ErtsMessage *) &ohdp->yield->last; + remove_yield_marker(c_p, &ohdp->yield->next); + } + + ASSERT(!ohdp->yield || !ohdp->yield->next.in_msgq); + nmsgs = 0; - while (msg != sig) { + while (msg != endp) { ASSERT(!!msg); nmsgs++; if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) { @@ -7143,6 +7525,18 @@ handle_move_msgq_off_heap(Process *c_p, cnt += h_sz/ERTS_PROC_SIG_ADJ_MSGQ_COPY_FACTOR; } + if (cnt > limit) { /* yield... */ + ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next; + ASSERT(*nextpp); + if (*nextpp == endp) + break; /* we're at the end; no point yielding... */ + if (!ohdp->yield) + ohdp->yield = create_yield_adj_msgq_data(c_p); + insert_adj_msgq_yield_markers(c_p, ohdp->yield, nextpp, + next_nm_sig, saved_nm_sigs); + return cnt; + } + msg = msg->next; if (!msg) msg = c_p->sig_qs.cont; @@ -7154,13 +7548,15 @@ done: cleanup: + destroy_adj_msgq_yield_markers(c_p, &ohdp->yield); sig->next = NULL; + sig->data.attached = ERTS_MSG_COMBINED_HFRAG; erts_cleanup_messages(sig); c_p->sig_qs.flags &= ~FS_OFF_HEAP_MSGQ_CHNG; - if (cnt > CONTEXT_REDS) - return CONTEXT_REDS; + if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR) + return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR; return cnt; } @@ -7601,9 +7997,13 @@ erts_proc_sig_cleanup_queues(Process *c_p) while (sig) { ErtsMessage *free_sig = sig; sig = sig->next; - if (ERTS_SIG_IS_RECV_MARKER(free_sig)) - recv_marker_deallocate(c_p, (ErtsRecvMarker *) free_sig); + if (ERTS_SIG_IS_RECV_MARKER(free_sig)) { + ErtsRecvMarker *recv_mark = (ErtsRecvMarker *) free_sig; + ASSERT(!recv_mark->is_yield_mark); + recv_marker_deallocate(c_p, recv_mark); + } else { + ASSERT(ERTS_SIG_IS_MSG(free_sig)); free_sig->next = NULL; erts_cleanup_messages(free_sig); } @@ -7801,9 +8201,6 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, break; case ERTS_SIG_Q_OP_ADJ_MSGQ: - if (type == ERTS_SIG_Q_TYPE_OFF_HEAP) - break; - /* Fall through... */ case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); break; @@ -8146,7 +8543,10 @@ proc_sig_hdbg_check_queue(Process *proc, if (sig_psflg != ERTS_PSFLG_FREE) { erts_aint32_t state = erts_atomic32_read_nob(&proc->state); - ERTS_ASSERT(nm_sigs ? !!(state & sig_psflg) : !(state & sig_psflg)); + ERTS_ASSERT(nm_sigs + ? !!(state & sig_psflg) + : (!(state & sig_psflg) + || !!erts_atomic_read_nob(&proc->sig_inq_buffers))); } return msg_len; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index deb1b802e1..8faa79507f 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -1736,7 +1736,7 @@ Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id); void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id); ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp); -void erts_msgq_remove_leading_recv_markers(Process *c_p); +void erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p); #define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \ ((int) ((MRKP) - &(BLKP)->marker[0])) @@ -2112,8 +2112,9 @@ erts_msgq_set_save_first(Process *c_p) * anymore... */ if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first)) - erts_msgq_remove_leading_recv_markers(c_p); - c_p->sig_qs.save = &c_p->sig_qs.first; + erts_msgq_remove_leading_recv_markers_set_save_first(c_p); + else + c_p->sig_qs.save = &c_p->sig_qs.first; } ERTS_GLB_INLINE void diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 393de2b7b1..00b6d16862 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -6631,10 +6631,12 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, */ /* - * No normal execution until dirty CLA or hibernat has + * No normal execution until dirty CLA or hibernate has * been handled... */ - ASSERT(!(p->flags & (F_DIRTY_CLA | F_DIRTY_GC_HIBERNATE))); + ASSERT(!(p->flags & (F_DIRTY_CHECK_CLA + | F_DIRTY_CLA + | F_DIRTY_GC_HIBERNATE))); a = erts_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_DIRTY_ACTIVE_SYS); @@ -10075,12 +10077,17 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) * continue exectution until the flush * completes... */ - if (state & ERTS_PSFLGS_DIRTY_WORK) - sig_reds = reds; - else if (p->sig_qs.flags & FS_FLUSHING_SIGS) - sig_reds = reds; - else - sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; + sig_reds = reds; + if (((state & (ERTS_PSFLGS_DIRTY_WORK + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) + && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { + /* + * We are active, i.e., have erlang work to do, + * and have no dirty work and are not flushing + * limit amount of signal handling work... + */ + sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; + } (void) erts_proc_sig_handle_incoming(p, &state, &sig_reds, @@ -10600,25 +10607,49 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) int fcalls; int cla_reds = 0; - if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) - fcalls = reds; - else - fcalls = reds - CONTEXT_REDS; - st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls); - reds -= cla_reds; - if (is_non_value(st_res)) { - if (c_p->flags & F_DIRTY_CLA) { - save_dirty_task(c_p, st); - st = NULL; - break; - } - /* Needed gc, but gc was disabled */ - save_gc_task(c_p, st, st_prio); - st = NULL; - break; - } - /* We did a major gc */ - minor_gc = major_gc = 1; + if (st->arg[0] == am_true) { + /* + * Check if copy literal area GC is needed and only + * do GC if needed. This check is never requested unless + * we know that this is to much work to do on a normal + * scheduler, so we do not even try to check it here + * but instead unconditionally schedule this as dirty + * work... + */ + if (c_p->flags & F_DISABLE_GC) { + /* We might need to GC, but GC was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + } + else { + c_p->flags |= F_DIRTY_CHECK_CLA; + save_dirty_task(c_p, st); + st = NULL; + erts_schedule_dirty_sys_execution(c_p); + } + } + else { + /* Copy literal area GC needed... */ + if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) + fcalls = reds; + else + fcalls = reds - CONTEXT_REDS; + st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls); + reds -= cla_reds; + if (is_non_value(st_res)) { + if (c_p->flags & F_DIRTY_CLA) { + save_dirty_task(c_p, st); + st = NULL; + break; + } + /* Needed gc, but gc was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + break; + } + /* We did a major gc */ + minor_gc = major_gc = 1; + } break; } case ERTS_PSTT_FTMQ: @@ -10823,10 +10854,12 @@ erts_execute_dirty_system_task(Process *c_p) /* * If multiple operations, perform them in the following * order (in order to avoid unnecessary GC): - * 1. Copy Literal Area (implies major GC). - * 2. GC Hibernate (implies major GC if not woken). - * 3. Major GC (implies minor GC). - * 4. Minor GC. + * 1. Check for Copy Literals Area GC need. This may + * trigger a Copy Literals Area GC. + * 2. Copy Literal Area GC (implies major GC). + * 3. GC Hibernate (implies major GC if not woken). + * 4. Major GC (implies minor GC). + * 5. Minor GC. * * System task requests are handled after the actual * operations have been performed... @@ -10834,6 +10867,37 @@ erts_execute_dirty_system_task(Process *c_p) ASSERT(!(c_p->flags & (F_DELAY_GC|F_DISABLE_GC))); + if (c_p->flags & F_DIRTY_CHECK_CLA) { + ErtsLiteralArea *la = ERTS_COPY_LITERAL_AREA(); + + ASSERT(!(c_p->flags & F_DIRTY_CLA)); + c_p->flags &= ~F_DIRTY_CHECK_CLA; + if (!la) + cla_res = am_ok; + else { + int check_cla_reds = 0; + char *literals = (char *) &la->start[0]; + Uint lit_bsize = (char *) la->end - literals; + if (erts_check_copy_literals_gc_need(c_p, + &check_cla_reds, + literals, + lit_bsize)) { + /* + * We had references to this literal area on the heap; + * need a copy literals GC... + */ + c_p->flags |= F_DIRTY_CLA; + } + else { + /* + * We have no references to this literal area on the + * heap; no copy literals GC needed... + */ + cla_res = am_ok; + } + } + } + if (c_p->flags & F_DIRTY_CLA) { int cla_reds = 0; cla_res = erts_copy_literals_gc(c_p, &cla_reds, c_p->fcalls); @@ -10862,7 +10926,8 @@ erts_execute_dirty_system_task(Process *c_p) c_p->arity, c_p->fcalls); } - ASSERT(!(c_p->flags & (F_DIRTY_CLA + ASSERT(!(c_p->flags & (F_DIRTY_CHECK_CLA + | F_DIRTY_CLA | F_DIRTY_GC_HIBERNATE | F_DIRTY_MAJOR_GC | F_DIRTY_MINOR_GC))); @@ -11246,7 +11311,7 @@ erts_internal_request_system_task_4(BIF_ALIST_4) } void -erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id) +erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check) { Process *rp; ErtsProcSysTask *st; @@ -11272,7 +11337,8 @@ erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id) req_id_sz, &hp, &st->off_heap); - for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[0] = check ? am_true : am_false; + for (i = 1; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) st->arg[i] = THE_NON_VALUE; rp = erts_proc_lookup_raw(to); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 0459aeaba5..8dc55a8da0 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1568,6 +1568,7 @@ extern int erts_system_profile_ts_type; #define F_TRAP_EXIT (1 << 22) /* Trapping exit */ #define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */ #define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */ +#define F_DIRTY_CHECK_CLA (1 << 25) /* Check if copy literal area GC scheduled */ /* Signal queue flags */ #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ @@ -1934,7 +1935,7 @@ void erts_schedule_thr_prgr_later_cleanup_op(void (*)(void *), ErtsThrPrgrLaterOp *, UWord); void erts_schedule_complete_off_heap_message_queue_change(Eterm pid); -void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id); +void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check); struct db_fixation; void erts_schedule_ets_free_fixation(Eterm pid, struct db_fixation*); void erts_schedule_flush_trace_messages(Process *proc, int force_on_proc); diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index f892d0dd89..f54596cf58 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -957,6 +957,7 @@ Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2); Eterm erts_check_process_code(Process *c_p, Eterm module, int *redsp, int fcalls); #define ERTS_CLA_SCAN_WORDS_PER_RED 512 +int erts_check_copy_literals_gc_need_max_reds(Process *c_p); int erts_check_copy_literals_gc_need(Process *c_p, int *redsp, char *literals, Uint lit_bsize); Eterm erts_copy_literals_gc(Process *c_p, int *redsp, int fcalls); diff --git a/erts/emulator/test/literal_area_collector_test.erl b/erts/emulator/test/literal_area_collector_test.erl index 14583db88d..d0b6fb3822 100644 --- a/erts/emulator/test/literal_area_collector_test.erl +++ b/erts/emulator/test/literal_area_collector_test.erl @@ -28,38 +28,29 @@ check_idle(Timeout) when is_integer(Timeout) > 0 -> ScaledTimeout = Timeout*test_server:timetrap_scale_factor(), Pid = find_literal_area_collector(), Start = erlang:monotonic_time(millisecond), - try - wait_for_idle_literal_collector(Pid, Start, ScaledTimeout, -1, 0) - catch - throw:done -> - ok - end. + Alias = alias(), + wait_for_idle_literal_collector(Pid, Alias, Start, ScaledTimeout). -wait_for_idle_literal_collector(Pid, Start, Timeout, NWaiting, WRedsStart) -> - {W, R} = case process_info(Pid, [status, reductions]) of - [{status, waiting}, {reductions, Reds}] -> - %% Assume that reds aren't bumped more than - %% 2 in order to service this process info - %% request... - case {NWaiting > 100, Reds - WRedsStart =< 2*NWaiting} of - {true, true} -> - throw(done); - {false, true} -> - {NWaiting+1, WRedsStart}; - _ -> - {0, Reds} - end; - _ -> - {-1, 0} - end, +wait_for_idle_literal_collector(Pid, Alias, Start, Timeout) -> + Ref = make_ref(), + Pid ! {get_status, Ref, Alias}, Now = erlang:monotonic_time(millisecond), - if Now - Start > Timeout -> - error({busy_literal_area_collecor_timout, Timeout}); - true -> - ok - end, - receive after 1 -> ok end, - wait_for_idle_literal_collector(Pid, Start, Timeout, W, R). + TMO = case Start + Timeout - Now of + TimeLeft when TimeLeft < 0 -> 0; + TimeLeft -> TimeLeft + end, + receive + {Ref, idle} -> + unalias(Alias), + ok; + {Ref, _} -> + receive after 10 -> ok end, + wait_for_idle_literal_collector(Pid, Alias, Start, Timeout) + after TMO -> + unalias(Alias), + receive {Ref, _} -> ok after 0 -> ok end, + error({busy_literal_area_collecor_timout, Timeout}) + end. find_literal_area_collector() -> case get('__literal_area_collector__') of diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl index e1fcd3fbd4..ec2cbfb80e 100644 --- a/erts/emulator/test/signal_SUITE.erl +++ b/erts/emulator/test/signal_SUITE.erl @@ -32,6 +32,7 @@ -include_lib("common_test/include/ct.hrl"). -export([all/0, suite/0,init_per_suite/1, end_per_suite/1]). -export([init_per_testcase/2, end_per_testcase/2]). +-export([groups/0, init_per_group/2, end_per_group/2]). % Test cases -export([xm_sig_order/1, @@ -46,7 +47,19 @@ monitor_order/1, monitor_named_order_local/1, monitor_named_order_remote/1, - monitor_nodes_order/1]). + monitor_nodes_order/1, + move_msgs_off_heap_signal_basic/1, + move_msgs_off_heap_signal_recv/1, + move_msgs_off_heap_signal_exit/1, + move_msgs_off_heap_signal_recv_exit/1, + copy_literal_area_signal_basic/1, + copy_literal_area_signal_recv/1, + copy_literal_area_signal_exit/1, + copy_literal_area_signal_recv_exit/1, + simultaneous_signals_basic/1, + simultaneous_signals_recv/1, + simultaneous_signals_exit/1, + simultaneous_signals_recv_exit/1]). -export([spawn_spammers/3]). @@ -79,7 +92,29 @@ all() -> monitor_order, monitor_named_order_local, monitor_named_order_remote, - monitor_nodes_order]. + monitor_nodes_order, + {group, adjust_message_queue}]. + +groups() -> + [{adjust_message_queue, [], + [move_msgs_off_heap_signal_basic, + move_msgs_off_heap_signal_recv, + move_msgs_off_heap_signal_exit, + move_msgs_off_heap_signal_recv_exit, + copy_literal_area_signal_basic, + copy_literal_area_signal_recv, + copy_literal_area_signal_exit, + copy_literal_area_signal_recv_exit, + simultaneous_signals_basic, + simultaneous_signals_recv, + simultaneous_signals_exit, + simultaneous_signals_recv_exit]}]. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. %% Test that exit signals and messages are received in correct order xm_sig_order(Config) when is_list(Config) -> @@ -706,6 +741,281 @@ receive_filter_spam() -> M -> M end. +move_msgs_off_heap_signal_basic(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(false, false). + +move_msgs_off_heap_signal_recv(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(true, false). + +move_msgs_off_heap_signal_exit(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(false, true). + +move_msgs_off_heap_signal_recv_exit(Config) when is_list(Config) -> + move_msgs_off_heap_signal_test(true, true). + +move_msgs_off_heap_signal_test(RecvPair, Exit) -> + erlang:trace(new_processes, true, [running_procs]), + SFact = test_server:timetrap_scale_factor(), + GoTime = erlang:monotonic_time(millisecond) + 1000*SFact, + ProcF = fun () -> + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + on_heap = process_flag(message_queue_data, off_heap), + if RecvPair -> receive_integer_pairs(infinity); + true -> receive after infinity -> ok end + end + end, + Ps = lists:map(fun (_) -> + spawn_opt(ProcF, + [link, + {message_queue_data, on_heap}]) + end, lists:seq(1, 100)), + lists:foreach(fun (P) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)) + end, Ps), + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now + 10 of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + if Exit -> + _ = lists:foldl(fun (P, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_P, N) -> + N+1 + end, + 0, + Ps), + ok; + true -> + ok + end, + wait_traced_not_running(1000 + 200*SFact), + erlang:trace(new_processes, false, [running_procs]), + lists:foreach(fun (P) -> + unlink(P), + exit(P, kill) + end, Ps), + lists:foreach(fun (P) -> + false = is_process_alive(P) + end, Ps), + ok. + +copy_literal_area_signal_basic(Config) when is_list(Config) -> + copy_literal_area_signal_test(false, false). + +copy_literal_area_signal_recv(Config) when is_list(Config) -> + copy_literal_area_signal_test(true, false). + +copy_literal_area_signal_exit(Config) when is_list(Config) -> + copy_literal_area_signal_test(false, true). + +copy_literal_area_signal_recv_exit(Config) when is_list(Config) -> + copy_literal_area_signal_test(true, true). + +copy_literal_area_signal_test(RecvPair, Exit) -> + persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()), + Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}), + true = is_reference(Literal), + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + ProcF = fun () -> + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + if RecvPair -> + receive receive_pairs -> ok end, + receive_integer_pairs(0); + true -> + ok + end, + receive check_literal_conversion -> ok end, + receive + Literal -> + %% Should not be a literal anymore... + false = (0 == erts_debug:size_shared(Literal)) + end + end, + PMs = lists:map(fun (_) -> + spawn_opt(ProcF, [link, monitor]) + end, lists:seq(1, 100)), + lists:foreach(fun ({P,_M}) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)), + P ! Literal + end, PMs), + persistent_term:erase({?MODULE, ?FUNCTION_NAME}), + receive after 1 -> ok end, + if RecvPair -> + lists:foreach(fun ({P,_M}) -> + P ! receive_pairs + end, PMs); + true -> + ok + end, + if Exit -> + _ = lists:foldl(fun ({P, _M}, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_PM, N) -> + N+1 + end, + 0, + PMs), + ok; + true -> + ok + end, + literal_area_collector_test:check_idle(), + lists:foreach(fun ({P,_M}) -> + P ! check_literal_conversion + end, PMs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, R} -> + case R of + normal -> ok; + terminated -> ok + end + end + end, PMs), + ok. + +simultaneous_signals_basic(Config) when is_list(Config) -> + simultaneous_signals_test(false, false). + +simultaneous_signals_recv(Config) when is_list(Config) -> + simultaneous_signals_test(true, false). + +simultaneous_signals_exit(Config) when is_list(Config) -> + simultaneous_signals_test(false, true). + +simultaneous_signals_recv_exit(Config) when is_list(Config) -> + simultaneous_signals_test(true, true). + +simultaneous_signals_test(RecvPairs, Exit) -> + erlang:trace(new_processes, true, [running_procs]), + persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()), + Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}), + true = is_reference(Literal), + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + SFact = test_server:timetrap_scale_factor(), + GoTime = erlang:monotonic_time(millisecond) + 1000*SFact, + ProcF = fun () -> + 0 = erts_debug:size_shared(Literal), %% Should be a literal... + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now of + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + on_heap = process_flag(message_queue_data, off_heap), + if RecvPairs -> receive_integer_pairs(0); + true -> ok + end, + receive check_literal_conversion -> ok end, + receive + Literal -> + %% Should not be a literal anymore... + false = (0 == erts_debug:size_shared(Literal)) + end + end, + PMs = lists:map(fun (_) -> + spawn_opt(ProcF, + [link, + monitor, + {message_queue_data, on_heap}]) + end, lists:seq(1, 100)), + lists:foreach(fun ({P,_M}) -> + lists:foreach(fun (N) when N rem 100 == 0 -> + P ! [N|N]; + (N) -> + P ! N + end, lists:seq(1, 10000)), + P ! Literal + end, PMs), + Now = erlang:monotonic_time(millisecond), + Tmo = case GoTime - Now - 5 of % a bit earlier... + Left when Left < 0 -> + erlang:display({go_time_passed, Left}), + 0; + Left -> + Left + end, + receive after Tmo -> ok end, + persistent_term:erase({?MODULE, ?FUNCTION_NAME}), + receive after 10 -> ok end, + if Exit -> + _ = lists:foldl(fun ({P, _M}, N) when N rem 10 -> + unlink(P), + exit(P, terminated), + N+1; + (_PM, N) -> + N+1 + end, + 0, + PMs), + ok; + true -> + ok + end, + wait_traced_not_running(1000 + 200*SFact), + erlang:trace(new_processes, false, [running_procs]), + literal_area_collector_test:check_idle(), + lists:foreach(fun ({P,_M}) -> + P ! check_literal_conversion + end, PMs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, R} -> + case R of + normal -> ok; + terminated -> ok + end + end + end, PMs), + ok. + + +wait_traced_not_running(Tmo) -> + receive + {trace,_,What,_} when What == in; + What == out -> + wait_traced_not_running(Tmo) + after + Tmo -> + ok + end. + +receive_integer_pairs(Tmo) -> + receive + [N|N] -> + receive_integer_pairs(Tmo) + after + Tmo -> + ok + end. %% %% -- Internal utils -------------------------------------------------------- diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam Binary files differindex d54baaf4a2..165bfa4622 100644 --- a/erts/preloaded/ebin/erts_literal_area_collector.beam +++ b/erts/preloaded/ebin/erts_literal_area_collector.beam diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl index 8a73ed1685..bb2ad6b919 100644 --- a/erts/preloaded/src/erts_literal_area_collector.erl +++ b/erts/preloaded/src/erts_literal_area_collector.erl @@ -62,39 +62,46 @@ msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) -> switch_area(); %% Process (_Pid) has completed the request... - {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1, - NeedIReq == [] -> + {copy_literals, {Area, _ReqType, _Pid}, ok} when Ongoing == 1, + NeedIReq == [] -> switch_area(); %% Last process completed... - {copy_literals, {Area, false, _Pid}, ok} -> + {copy_literals, {Area, init, _Pid}, ok} -> msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), GcOutstnd, NeedGC); - {copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] -> + {copy_literals, {Area, ReqType, _Pid}, ok} when NeedGC == [], + ReqType /= init -> msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), GcOutstnd-1, []); - {copy_literals, {Area, true, _Pid}, ok} -> - send_copy_req(hd(NeedGC), Area, true), - msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC)); + {copy_literals, {Area, ReqType, _Pid}, ok} when ReqType /= init -> + [{GCPid,GCWork} | NewNeedGC] = NeedGC, + send_copy_req(GCPid, Area, GCWork), + msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, NewNeedGC); %% Process (Pid) failed to complete the request %% since it needs to garbage collect in order to %% complete the request... - {copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND -> - send_copy_req(Pid, Area, true), + {copy_literals, {Area, init, Pid}, GCWork} when GcOutstnd + < ?MAX_GC_OUTSTND -> + send_copy_req(Pid, Area, GCWork), msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC); - {copy_literals, {Area, false, Pid}, need_gc} -> + {copy_literals, {Area, init, Pid}, GCWork} -> msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq), - GcOutstnd, [Pid|NeedGC]); + GcOutstnd, [{Pid,GCWork} | NeedGC]); %% Not handled message regarding the area that we %% currently are working with. Crash the VM so %% we notice this bug... - {copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) -> + {copy_literals, {Area, _, _}, _} = Msg -> exit({not_handled_message, Msg}); {change_prio, From, Ref, Prio} -> change_prio(From, Ref, Prio), msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + {get_status, Ref, From} when is_pid(From); is_reference(From) -> + From ! {Ref, if Ongoing == 0 -> idle; true -> working end}, + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + %% Unexpected garbage message. Get rid of it... _Ignore -> msg_loop(Area, OReqInfo, GcOutstnd, NeedGC) @@ -126,7 +133,7 @@ switch_area() -> check_send_copy_req(_Area, Ongoing, []) -> {Ongoing, []}; check_send_copy_req(Area, Ongoing, [Pid|Pids]) -> - send_copy_req(Pid, Area, false), + send_copy_req(Pid, Area, init), {Ongoing+1, Pids}. send_copy_reqs(Ps, Area, OReqLim) -> @@ -137,23 +144,23 @@ send_copy_reqs([], _Area, _OReqLim, N) -> send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim -> {N, Ps}; send_copy_reqs([P|Ps], Area, OReqLim, N) -> - send_copy_req(P, Area, false), + send_copy_req(P, Area, init), send_copy_reqs(Ps, Area, OReqLim, N+1). -send_copy_req(P, Area, GC) -> - erts_literal_area_collector:send_copy_request(P, Area, GC). +send_copy_req(P, Area, How) -> + erts_literal_area_collector:send_copy_request(P, Area, How). -spec release_area_switch() -> boolean(). release_area_switch() -> erlang:nif_error(undef). %% Implemented in beam_bif_load.c --spec send_copy_request(To, AreaId, GcAllowed) -> 'ok' when +-spec send_copy_request(To, AreaId, How) -> 'ok' when To :: pid(), AreaId :: term(), - GcAllowed :: boolean(). + How :: 'init' | 'check_gc' | 'need_gc'. -send_copy_request(_To, _AreaId, _GcAllowed) -> +send_copy_request(_To, _AreaId, _How) -> erlang:nif_error(undef). %% Implemented in beam_bif_load.c change_prio(From, Ref, Prio) -> |