summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2022-07-13 20:29:52 +0200
committerRickard Green <rickard@erlang.org>2022-07-13 20:29:52 +0200
commit9c63a7459cb4489bddd63be9468929cab709ab01 (patch)
tree9c81595b06601ccab1f6b29bbee2100a3f1f0217
parentc92c618ce7f44eea7f288bb9e783903ad7b2a577 (diff)
parentf002c5e89e0ba0329fc047d13591d79b4dada346 (diff)
downloaderlang-9c63a7459cb4489bddd63be9468929cab709ab01.tar.gz
Merge branch 'maint'
* maint: [erts] Yield when adjusting large message queues [erts] Improve flushing of signals
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/beam_bif_load.c61
-rw-r--r--erts/emulator/beam/bif.c75
-rw-r--r--erts/emulator/beam/bif.h10
-rw-r--r--erts/emulator/beam/erl_alloc.types1
-rw-r--r--erts/emulator/beam/erl_bif_info.c221
-rw-r--r--erts/emulator/beam/erl_bif_port.c89
-rw-r--r--erts/emulator/beam/erl_message.c6
-rw-r--r--erts/emulator/beam/erl_message.h1
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c731
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h77
-rw-r--r--erts/emulator/beam/erl_process.c178
-rw-r--r--erts/emulator/beam/erl_process.h12
-rw-r--r--erts/emulator/beam/erl_process_dump.c17
-rw-r--r--erts/emulator/beam/global.h1
-rw-r--r--erts/emulator/test/bif_SUITE.erl60
-rw-r--r--erts/emulator/test/literal_area_collector_test.erl51
-rw-r--r--erts/emulator/test/nif_SUITE.erl5
-rw-r--r--erts/emulator/test/port_SUITE.erl1
-rw-r--r--erts/emulator/test/process_SUITE.erl178
-rw-r--r--erts/emulator/test/signal_SUITE.erl314
-rw-r--r--erts/etc/unix/etp-commands.in2
-rw-r--r--erts/preloaded/ebin/erts_literal_area_collector.beambin5616 -> 5932 bytes
-rw-r--r--erts/preloaded/src/erts_literal_area_collector.erl45
24 files changed, 1776 insertions, 361 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 09cc8fc6b5..b04a5f6052 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -172,6 +172,7 @@ atom cflags
atom CHANGE='CHANGE'
atom characters_to_binary_int
atom characters_to_list_int
+atom check_gc
atom clear
atom clock_service
atom close
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index f910a4e8be..0e33dfa667 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -975,12 +975,39 @@ set_default_trace_pattern(Eterm module)
}
int
-erts_check_copy_literals_gc_need(Process *c_p, int *redsp,
- char *literals, Uint lit_bsize)
+erts_check_copy_literals_gc_need_max_reds(Process *c_p)
{
+ Uint64 words, reds;
+
/*
- * TODO: Implement yielding support!
+ * Calculate maximum amount of words that needs
+ * to be scanned...
*/
+ words = 1; /* fvalue */
+ words += c_p->hend - c_p->stop; /* stack */
+ words += c_p->htop - c_p->heap; /* new heap */
+ if (c_p->abandoned_heap)
+ words += c_p->heap_sz; /* abandoned heap */
+ words += c_p->old_htop - c_p->old_heap; /* old heap */
+ if (c_p->dictionary) {
+ Eterm* start = ERTS_PD_START(c_p->dictionary);
+ Eterm* end = start + ERTS_PD_SIZE(c_p->dictionary);
+
+ words += end - start; /* dictionary */
+ }
+ words += c_p->mbuf_sz; /* heap and message fragments */
+
+ /* Convert to reductions... */
+ reds = ((words - 1)/ERTS_CLA_SCAN_WORDS_PER_RED) + 1;
+ if (reds > CONTEXT_REDS)
+ return CONTEXT_REDS+1;
+ return (int) reds;
+}
+
+int
+erts_check_copy_literals_gc_need(Process *c_p, int *redsp,
+ char *literals, Uint lit_bsize)
+{
ErlHeapFragment *hfrag;
ErtsMessage *mfp;
Uint64 scanned = 0;
@@ -1529,22 +1556,30 @@ erts_literal_area_collector_send_copy_request_3(BIF_ALIST_3)
req_id = TUPLE3(&tmp_heap[0], BIF_ARG_2, BIF_ARG_3, BIF_ARG_1);
- if (BIF_ARG_3 == am_false) {
+ switch (BIF_ARG_3) {
+
+ case am_init:
/*
- * Will handle signal queue and check if GC if needed. If
- * GC is needed operation will be continued by a GC (below).
+ * Will handle signal queue and if possible check if GC if needed.
+ * If GC is needed or needs to be checked the operation will be
+ * restarted later in the 'check_gc' or 'need_gc' case below...
*/
erts_proc_sig_send_cla_request(BIF_P, BIF_ARG_1, req_id);
- }
- else if (BIF_ARG_3 == am_true) {
+ break;
+
+ case am_check_gc:
+ case am_need_gc:
/*
- * Will perform a literal GC. Note that this assumes that
- * signal queue already has been handled...
+ * Will check and/or perform a literal GC. Note that this assumes that
+ * signal queue already has been handled by 'init' case above...
*/
- erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id);
- }
- else
+ erts_schedule_cla_gc(BIF_P, BIF_ARG_1, req_id,
+ BIF_ARG_3 == am_check_gc);
+ break;
+
+ default:
BIF_ERROR(BIF_P, BADARG);
+ }
BIF_RET(am_ok);
}
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 385866f279..e9d8d920fc 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1523,7 +1523,7 @@ erts_internal_await_exit_trap(BIF_ALIST_0)
* terminated in order to ensure that signal order
* is preserved. Yield if necessary.
*/
- erts_aint32_t state;
+ erts_aint32_t state = erts_atomic32_read_nob(&BIF_P->state);
int reds = ERTS_BIF_REDS_LEFT(BIF_P);
(void) erts_proc_sig_handle_incoming(BIF_P, &state, &reds,
reds, !0);
@@ -5397,45 +5397,52 @@ static BIF_RETTYPE bif_return_trap(BIF_ALIST_2)
}
static BIF_RETTYPE
-bif_handle_signals_return(BIF_ALIST_1)
+bif_handle_signals_return(BIF_ALIST_2)
{
- int local_only = BIF_P->sig_qs.flags & FS_LOCAL_SIGS_ONLY;
- int sres, sreds, reds_left;
+ int reds_left;
erts_aint32_t state;
- reds_left = ERTS_BIF_REDS_LEFT(BIF_P);
- sreds = reds_left;
-
- if (!local_only) {
- erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MSGQ);
- erts_proc_sig_fetch(BIF_P);
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MSGQ);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) {
+ flushed:
+ ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS);
+ BIF_P->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS);
+ erts_set_gc_state(BIF_P, !0); /* Allow GC again... */
+ BIF_RET(BIF_ARG_2);
}
+
+ if (!(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS)) {
+ int flags = ((is_internal_pid(BIF_ARG_1)
+ || is_internal_port(BIF_ARG_1))
+ ? ERTS_PROC_SIG_FLUSH_FLG_FROM_ID
+ : ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL);
+ erts_proc_sig_init_flush_signals(BIF_P, flags, BIF_ARG_1);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ }
+
+ ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS);
- state = erts_atomic32_read_nob(&BIF_P->state);
- sres = erts_proc_sig_handle_incoming(BIF_P, &state, &sreds,
- sreds, !0);
-
- BUMP_REDS(BIF_P, (int) sreds);
- reds_left -= sreds;
+ reds_left = ERTS_BIF_REDS_LEFT(BIF_P);
- if (state & ERTS_PSFLG_EXITING) {
- BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- ERTS_BIF_EXITED(BIF_P);
- }
- if (!sres | (reds_left <= 0)) {
- /*
- * More signals to handle or out of reds; need
- * to yield and continue. Prevent fetching of
- * more signals by setting local-sigs-only flag.
- */
- BIF_P->sig_qs.flags |= FS_LOCAL_SIGS_ONLY;
- ERTS_BIF_YIELD1(&erts_bif_handle_signals_return_export,
- BIF_P, BIF_ARG_1);
- }
+ state = erts_atomic32_read_nob(&BIF_P->state);
+ do {
+ int sreds = reds_left;
+ (void) erts_proc_sig_handle_incoming(BIF_P, &state, &sreds,
+ sreds, !0);
+ BUMP_REDS(BIF_P, (int) sreds);
+ if (state & ERTS_PSFLG_EXITING)
+ ERTS_BIF_EXITED(BIF_P);
+ if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ reds_left -= sreds;
+ } while (reds_left > 0);
- BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- BIF_RET(BIF_ARG_1);
+ /*
+ * More signals to handle, but out of reductions. Yield
+ * and come back here and continue...
+ */
+ ERTS_BIF_YIELD2(&erts_bif_handle_signals_return_export,
+ BIF_P, BIF_ARG_1, BIF_ARG_2);
}
Export bif_return_trap_export;
@@ -5477,7 +5484,7 @@ void erts_init_bif(void)
&bif_return_trap);
erts_init_trap_export(&erts_bif_handle_signals_return_export,
- am_erlang, am_bif_handle_signals_return, 1,
+ am_erlang, am_bif_handle_signals_return, 2,
&bif_handle_signals_return);
erts_await_result = erts_export_put(am_erts_internal,
diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h
index 350f2ad430..a89ea5d4f2 100644
--- a/erts/emulator/beam/bif.h
+++ b/erts/emulator/beam/bif.h
@@ -522,12 +522,12 @@ do { \
extern Export erts_bif_handle_signals_return_export;
-#define ERTS_BIF_HANDLE_SIGNALS_RETURN(P, VAL) \
- BIF_TRAP1(&erts_bif_handle_signals_return_export, (P), (VAL))
+#define ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(P, FROM, VAL) \
+ BIF_TRAP2(&erts_bif_handle_signals_return_export, (P), (FROM), (VAL))
-#define ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(Ret, P, Val) \
- ERTS_BIF_PREP_TRAP1((Ret), &erts_bif_handle_signals_return_export, \
- (P), (Val))
+#define ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(Ret, P, From, Val) \
+ ERTS_BIF_PREP_TRAP2((Ret), &erts_bif_handle_signals_return_export, \
+ (P), (From), (Val))
#define ERTS_BIF_PREP_EXITED(RET, PROC) \
do { \
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 4aed5d9b34..24ba019075 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -274,6 +274,7 @@ type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data
type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data
type RELEASE_LAREA SHORT_LIVED SYSTEM release_literal_area
type SIG_DATA SHORT_LIVED PROCESSES signal_data
+type SIG_YIELD_DATA SHORT_LIVED PROCESSES signal_yield_data
type DIST_DEMONITOR SHORT_LIVED PROCESSES dist_demonitor
type CML_CLEANUP SHORT_LIVED SYSTEM connection_ml_cleanup
type ML_YIELD_STATE SHORT_LIVED SYSTEM monitor_link_yield_state
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 2c27d460fd..430c4f1d80 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -1007,6 +1007,7 @@ process_info_aux(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int item_ix,
+ Sint *msgq_len_p,
int flags,
Uint *reserve_sizep,
Uint *reds);
@@ -1025,11 +1026,12 @@ erts_process_info(Process *c_p,
Eterm res;
Eterm part_res[ERTS_PI_ARGS];
int item_ix_ix, ix;
+ Sint msgq_len = -1;
if (ERTS_PI_FLAG_SINGELTON & flags) {
ASSERT(item_ix_len == 1);
res = process_info_aux(c_p, hfact, rp, rp_locks, item_ix[0],
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
return res;
}
@@ -1047,7 +1049,7 @@ erts_process_info(Process *c_p,
ix = pi_arg2ix(am_messages);
ASSERT(part_res[ix] == THE_NON_VALUE);
res = process_info_aux(c_p, hfact, rp, rp_locks, ix,
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
ASSERT(res != am_undefined);
ASSERT(res != THE_NON_VALUE);
part_res[ix] = res;
@@ -1057,7 +1059,7 @@ erts_process_info(Process *c_p,
ix = item_ix[item_ix_ix];
if (part_res[ix] == THE_NON_VALUE) {
res = process_info_aux(c_p, hfact, rp, rp_locks, ix,
- flags, &reserve_size, reds);
+ &msgq_len, flags, &reserve_size, reds);
ASSERT(res != am_undefined);
ASSERT(res != THE_NON_VALUE);
part_res[ix] = res;
@@ -1092,6 +1094,92 @@ erts_process_info(Process *c_p,
static void
pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix);
+static ERTS_INLINE int
+pi_maybe_flush_signals(Process *c_p, int pi_flags)
+{
+ int reds_left;
+ erts_aint32_t state;
+
+ /*
+ * pi_maybe_flush_signals() flush signals in callers
+ * signal queue for two different reasons:
+ *
+ * 1. If we need 'message_queue_len', but not 'messages', we need
+ * to handle all signals in the middle queue in order for
+ * 'c_p->sig_qs.len' to reflect the amount of messages in the
+ * message queue. We could count traverse the queues, but it
+ * is better to handle all signals in the queue instead since
+ * this is work we anyway need to do at some point.
+ *
+ * 2. Ensures that all signals that the caller might have sent to
+ * itself are handled before we gather information.
+ *
+ * This is, however, not strictly necessary. process_info() is
+ * not documented to send itself a signal when gathering
+ * information about itself. That is, the operation is not
+ * bound by the signal order guarantee when gathering
+ * information about itself. If we do not handle outstanding
+ * signals before gathering the information, outstanding signals
+ * from the caller to itself will not be part of the result.
+ * This would not be wrong, but perhaps surprising for the user.
+ * We continue doing it this way for now, since this is how it
+ * has been done for a very long time. We should, however,
+ * consider changing this in a future release, since this signal
+ * handling is not for free, although quite cheap since these
+ * signals anyway must be handled at some point.
+ */
+
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) {
+ flushed:
+
+ ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
+ | ERTS_PI_FLAG_NEED_MSGQ_LEN))
+ != ERTS_PI_FLAG_NEED_MSGQ_LEN)
+ || !c_p->sig_qs.cont);
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+
+ c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS);
+ erts_set_gc_state(c_p, !0); /* Allow GC again... */
+ return 0; /* done, all signals handled... */
+ }
+
+ state = erts_atomic32_read_nob(&c_p->state);
+
+ if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
+ int flush_flags = 0;
+ if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
+ | ERTS_PI_FLAG_NEED_MSGQ_LEN))
+ == ERTS_PI_FLAG_NEED_MSGQ_LEN)
+ && c_p->sig_qs.cont) {
+ flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ;
+ }
+ if (state & ERTS_PSFLG_MAYBE_SELF_SIGS)
+ flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID;
+ if (!flush_flags)
+ return 0; /* done; no need to flush... */
+ erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id);
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ }
+
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ reds_left = ERTS_BIF_REDS_LEFT(c_p);
+
+ do {
+ int sreds = reds_left;
+ (void) erts_proc_sig_handle_incoming(c_p, &state, &sreds,
+ sreds, !0);
+ BUMP_REDS(c_p, (int) sreds);
+ if (state & ERTS_PSFLG_EXITING)
+ return -1; /* process exiting... */
+ if (c_p->sig_qs.flags & FS_FLUSHED_SIGS)
+ goto flushed;
+ reds_left -= sreds;
+ } while (reds_left > 0);
+
+ return 1; /* yield and continue here later... */
+}
+
static BIF_RETTYPE
process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
{
@@ -1110,41 +1198,6 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
ERTS_CT_ASSERT(ERTS_PI_DEF_ARR_SZ > 0);
- if (c_p->common.id == pid) {
- int local_only = c_p->sig_qs.flags & FS_LOCAL_SIGS_ONLY;
- int sres, sreds, reds_left;
-
- reds_left = ERTS_BIF_REDS_LEFT(c_p);
- sreds = reds_left;
-
- if (!local_only) {
- erts_proc_sig_queue_lock(c_p);
- erts_proc_sig_fetch(c_p);
- erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
- }
-
- sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0);
-
- BUMP_REDS(c_p, (int) sreds);
- reds_left -= sreds;
-
- if (state & ERTS_PSFLG_EXITING) {
- c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- goto exited;
- }
- if (!sres | (reds_left <= 0)) {
- /*
- * More signals to handle or out of reds; need
- * to yield and continue. Prevent fetching of
- * more signals by setting local-sigs-only flag.
- */
- c_p->sig_qs.flags |= FS_LOCAL_SIGS_ONLY;
- goto yield;
- }
-
- c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY;
- }
-
if (is_atom(opt)) {
int ix = pi_arg2ix(opt);
item_ix[0] = ix;
@@ -1190,7 +1243,16 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
goto badarg;
}
- if (is_not_internal_pid(pid)) {
+ if (c_p->common.id == pid) {
+ int res = pi_maybe_flush_signals(c_p, flags);
+ if (res != 0) {
+ if (res > 0)
+ goto yield;
+ else
+ goto exited;
+ }
+ }
+ else if (is_not_internal_pid(pid)) {
if (is_external_pid(pid)
&& external_pid_dist_entry(pid) == erts_this_dist_entry)
goto undefined;
@@ -1226,6 +1288,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
}
if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) {
ASSERT(locks & ERTS_PROC_LOCK_MAIN);
+ if (rp->sig_qs.flags & FS_FLUSHING_SIGS) {
+ erts_proc_unlock(rp, locks);
+ goto send_signal;
+ }
erts_proc_sig_queue_lock(rp);
erts_proc_sig_fetch(rp);
if (rp->sig_qs.cont) {
@@ -1264,7 +1330,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
if (c_p == rp || !ERTS_PROC_HAS_INCOMING_SIGNALS(c_p))
ERTS_BIF_PREP_RET(ret, res);
else
- ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(ret, c_p, res);
+ ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(ret, c_p,
+ pid, res);
done:
@@ -1355,6 +1422,7 @@ process_info_aux(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int item_ix,
+ Sint *msgq_len_p,
int flags,
Uint *reserve_sizep,
Uint *reds)
@@ -1468,8 +1536,10 @@ process_info_aux(Process *c_p,
case ERTS_PI_IX_MESSAGES: {
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
- if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE))
+ if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) {
+ *msgq_len_p = 0;
res = NIL;
+ }
else {
int info_on_self = !(flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER);
ErtsMessageInfo *mip;
@@ -1487,8 +1557,8 @@ process_info_aux(Process *c_p,
heap_need = erts_proc_sig_prep_msgq_for_inspection(c_p, rp,
rp_locks,
info_on_self,
- mip);
- len = rp->sig_qs.len;
+ mip, msgq_len_p);
+ len = *msgq_len_p;
heap_need += len*2; /* Cons cells */
@@ -1517,9 +1587,13 @@ process_info_aux(Process *c_p,
}
case ERTS_PI_IX_MESSAGE_QUEUE_LEN: {
- Sint len = rp->sig_qs.len;
+ Sint len = *msgq_len_p;
+ if (len < 0) {
+ ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER)
+ || !rp->sig_qs.cont);
+ len = rp->sig_qs.len;
+ }
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
- ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) || !rp->sig_qs.cont);
ASSERT(len >= 0);
if (len <= MAX_SMALL)
res = make_small(len);
@@ -3690,42 +3764,54 @@ BIF_RETTYPE erts_internal_is_process_alive_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
if (!erts_proc_sig_send_is_alive_request(BIF_P, BIF_ARG_1, BIF_ARG_2)) {
if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, am_ok);
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, am_ok);
}
BIF_RET(am_ok);
}
BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
{
+
if (is_internal_pid(BIF_ARG_1)) {
- erts_aint32_t state;
+ BIF_RETTYPE result;
Process *rp;
if (BIF_ARG_1 == BIF_P->common.id)
BIF_RET(am_true);
rp = erts_proc_lookup_raw(BIF_ARG_1);
- if (!rp)
- BIF_RET(am_false);
+ if (!rp) {
+ result = am_false;
+ }
+ else {
+ erts_aint32_t state = erts_atomic32_read_acqb(&rp->state);
+ if (state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_SIG_Q
+ | ERTS_PSFLG_SIG_IN_Q)) {
+ /*
+ * If in exiting state, trap out and send 'is alive'
+ * request and wait for it to complete termination.
+ *
+ * If process has signals enqueued, we need to
+ * send it an 'is alive' request via its signal
+ * queue in order to ensure that signal order is
+ * preserved (we may earlier have sent it an
+ * exit signal that has not been processed yet).
+ */
+ BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1);
+ }
- state = erts_atomic32_read_acqb(&rp->state);
- if (state & (ERTS_PSFLG_EXITING
- | ERTS_PSFLG_SIG_Q
- | ERTS_PSFLG_SIG_IN_Q)) {
+ result = am_true;
+ }
+
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
/*
- * If in exiting state, trap out and send 'is alive'
- * request and wait for it to complete termination.
- *
- * If process has signals enqueued, we need to
- * send it an 'is alive' request via its signal
- * queue in order to ensure that signal order is
- * preserved (we may earlier have sent it an
- * exit signal that has not been processed yet).
+ * Ensure that signal order of signals from inspected
+ * process to us is preserved...
*/
- BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1);
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, result);
}
-
- BIF_RET(am_true);
+ BIF_RET(result);
}
if (is_external_pid(BIF_ARG_1)) {
@@ -3734,6 +3820,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
}
BIF_ERROR(BIF_P, BADARG);
+
+
}
static Eterm
@@ -4218,10 +4306,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
ERTS_ASSERT_IS_NOT_EXITING(BIF_P);
BIF_RET(am_undefined);
}
-
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(p);
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ state = erts_atomic32_read_nob(&BIF_P->state);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
@@ -4284,10 +4372,11 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
ERTS_ASSERT_IS_NOT_EXITING(BIF_P);
BIF_RET(am_undefined);
}
-
+
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(p);
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ state = erts_atomic32_read_nob(&BIF_P->state);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index fc415aa4e5..737a6be15f 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -238,8 +238,17 @@ BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3)
}
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(res, BIF_P, res);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(res, BIF_P,
+ from, res);
+ }
}
return res;
@@ -287,8 +296,16 @@ BIF_RETTYPE erts_internal_port_call_3(BIF_ALIST_3)
ERTS_BIF_EXITED(BIF_P);
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
}
BIF_RET(retval);
@@ -335,8 +352,16 @@ BIF_RETTYPE erts_internal_port_control_3(BIF_ALIST_3)
ERTS_BIF_EXITED(BIF_P);
else {
/* Ensure signal order is preserved... */
- if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
}
BIF_RET(retval);
@@ -382,8 +407,16 @@ BIF_RETTYPE erts_internal_port_close_1(BIF_ALIST_1)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -426,8 +459,16 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -435,7 +476,7 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2)
BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
{
Eterm retval;
- Port* prt;
+ Port* prt = NULL;
if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) {
prt = sig_lookup_port(BIF_P, BIF_ARG_1);
@@ -474,8 +515,16 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
@@ -484,7 +533,7 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1)
BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2)
{
Eterm retval;
- Port* prt;
+ Port* prt = NULL;
if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) {
prt = sig_lookup_port(BIF_P, BIF_ARG_1);
@@ -523,8 +572,16 @@ BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2)
}
/* Ensure signal order is preserved... */
- if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P))
- ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval);
+ if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) {
+ Eterm from;
+ if (is_internal_port(BIF_ARG_1))
+ from = BIF_ARG_1;
+ else if (prt)
+ from = prt->common.id;
+ else
+ from = NIL;
+ ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval);
+ }
BIF_RET(retval);
}
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 151d0b41ad..0026ecce99 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -539,6 +539,9 @@ erts_queue_proc_message(Process* sender,
Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* mp, Eterm msg)
{
+ if (sender == receiver)
+ (void) erts_atomic32_read_bor_nob(&sender->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = sender->common.id;
queue_messages(sender->common.id, receiver, receiver_locks,
@@ -552,6 +555,9 @@ erts_queue_proc_messages(Process* sender,
Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* first, ErtsMessage** last, Uint len)
{
+ if (sender == receiver)
+ (void) erts_atomic32_read_bor_nob(&sender->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
queue_messages(sender->common.id, receiver, receiver_locks,
prepend_pending_sig_maybe(sender, receiver, first),
last, len);
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 87cd96f4cd..3a7aba5a68 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -284,6 +284,7 @@ typedef struct {
typedef struct {
ErtsSignal sig;
ErtsMessage **prev_next;
+ signed char is_yield_mark;
signed char pass;
signed char set_save;
signed char in_sigq;
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 840c3cea23..c5caac0bce 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -216,11 +216,26 @@ typedef struct {
} ErtsProcSigRPC;
typedef struct {
+ ErtsRecvMarker next;
+ ErtsRecvMarker last;
+} ErtsYieldAdjMsgQ;
+
+typedef struct {
+ ErtsYieldAdjMsgQ *yield;
Eterm requester;
Eterm request_id;
} ErtsCLAData;
-static void wait_handle_signals(Process *c_p);
+typedef struct {
+ ErtsYieldAdjMsgQ *yield;
+} ErtsAdjOffHeapMsgQData;
+
+typedef struct {
+ ErtsMessage *first;
+ ErtsMessage **last;
+} ErtsSavedNMSignals;
+
+static erts_aint32_t wait_handle_signals(Process *c_p);
static void wake_handle_signals(Process *proc);
static int handle_msg_tracing(Process *c_p,
@@ -243,12 +258,17 @@ static int
handle_cla(Process *c_p,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig,
- int exiting);
+ int exiting,
+ int limit,
+ ErtsSavedNMSignals *saved_nm_sigs);
+
static int
handle_move_msgq_off_heap(Process *c_p,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig,
- int exiting);
+ int exiting,
+ int limit,
+ ErtsSavedNMSignals *saved_nm_sigs);
static void
send_cla_reply(Process *c_p, ErtsMessage *sig, Eterm to,
Eterm req_id, Eterm result);
@@ -293,6 +313,76 @@ proc_sig_hdbg_check_queue(Process *c_p,
#define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN)
#endif
+static void
+save_delayed_nm_signal(ErtsSavedNMSignals *saved_sigs, ErtsMessage *sig)
+{
+ ErtsSignal *nm_sig = (ErtsSignal *) sig;
+ nm_sig->common.next = NULL;
+ nm_sig->common.specific.next = NULL;
+ if (!saved_sigs->first) {
+ ASSERT(!saved_sigs->last);
+ saved_sigs->first = sig;
+ saved_sigs->last = &saved_sigs->first;
+ }
+ else {
+ ErtsSignal *last;
+ ASSERT(saved_sigs->last);
+ last = (ErtsSignal *) *saved_sigs->last;
+ last->common.next = sig;
+ last->common.specific.next = &last->common.next;
+ saved_sigs->last = &last->common.next;
+ }
+}
+
+static erts_aint32_t
+restore_delayed_nm_signals(Process *c_p, ErtsSavedNMSignals *saved_sigs)
+{
+ erts_aint32_t state;
+ ErtsSignal *lsig;
+
+ ASSERT(saved_sigs->first && saved_sigs->last);
+
+ lsig = (ErtsSignal *) *saved_sigs->last;
+ if (!c_p->sig_qs.cont) {
+ ASSERT(!c_p->sig_qs.nmsigs.next);
+ ASSERT(!c_p->sig_qs.nmsigs.last);
+ if (saved_sigs->last == &saved_sigs->first)
+ c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
+ else
+ c_p->sig_qs.nmsigs.last = saved_sigs->last;
+ c_p->sig_qs.cont_last = &lsig->common.next;
+ }
+ else {
+ lsig->common.next = c_p->sig_qs.cont;
+ if (c_p->sig_qs.nmsigs.next) {
+ ASSERT(c_p->sig_qs.nmsigs.last);
+ if (c_p->sig_qs.nmsigs.next == &c_p->sig_qs.cont)
+ lsig->common.specific.next = &lsig->common.next;
+ else
+ lsig->common.specific.next = c_p->sig_qs.nmsigs.next;
+ if (c_p->sig_qs.nmsigs.last == &c_p->sig_qs.cont)
+ c_p->sig_qs.nmsigs.last = &lsig->common.next;
+ }
+ else {
+ ASSERT(!c_p->sig_qs.nmsigs.last);
+ if (saved_sigs->last == &saved_sigs->first)
+ c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
+ else
+ c_p->sig_qs.nmsigs.last = saved_sigs->last;
+ if (c_p->sig_qs.cont_last == &c_p->sig_qs.cont)
+ c_p->sig_qs.cont_last = &lsig->common.next;
+ }
+ }
+
+ c_p->sig_qs.cont = saved_sigs->first;
+ c_p->sig_qs.nmsigs.next = &c_p->sig_qs.cont;
+
+ state = erts_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_SIG_Q);
+ state |= ERTS_PSFLG_SIG_Q;
+ return state;
+}
+
typedef struct {
ErtsSignalCommon common;
Eterm ref;
@@ -389,6 +479,18 @@ get_cla_data(ErtsMessage *sig)
+ sig->hfrag.used_size);
}
+static ERTS_INLINE ErtsAdjOffHeapMsgQData *
+get_move_msgq_off_heap_data(ErtsMessage *sig)
+{
+ ASSERT(ERTS_SIG_IS_NON_MSG(sig));
+ ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) sig)->common.tag)
+ == ERTS_SIG_Q_OP_ADJ_MSGQ);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag)
+ == ERTS_SIG_Q_TYPE_OFF_HEAP);
+ return (ErtsAdjOffHeapMsgQData *) (char *) (&sig->hfrag.mem[0]
+ + sig->hfrag.used_size);
+}
+
static ERTS_INLINE void
destroy_trace_info(ErtsSigTraceInfo *ti)
{
@@ -598,7 +700,7 @@ enqueue_signals(Process *rp, ErtsMessage *first,
this = dest_queue->last;
- if ( ! is_to_buffer ){
+ if (!is_to_buffer) {
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
}
@@ -664,7 +766,9 @@ enqueue_signals(Process *rp, ErtsMessage *first,
dest_queue->len += num_msgs;
- ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
+ if (!is_to_buffer) {
+ ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
+ }
return state;
}
@@ -729,7 +833,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
/* Tracing requires sender for local procs and ports. The assertions below
* will not catch errors after time-of-death, but ought to find most
* problems. */
- ASSERT(sender != NULL ||
+ ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH ||
(is_normal_sched && esdp->pending_signal.sig == sig) ||
(!(is_internal_pid(from) &&
erts_proc_lookup(from) != NULL) &&
@@ -824,6 +928,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid,
sigp = &first;
first_last_done:
+
+ if ((void *) sender == (void *) rp)
+ (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state,
+ ERTS_PSFLG_MAYBE_SELF_SIGS);
+
sig->common.specific.next = NULL;
/* may add signals before sig */
@@ -1463,22 +1572,17 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig)
Eterm tag = ((ErtsSignal *) sig)->common.tag;
/*
- * Heap alias message, heap frag alias message and
- * adjust message queue signals are the only non-message
- * signals, which are allocated as messages, which do not
- * use a combined message / heap fragment.
+ * Heap alias message and heap frag alias message are
+ * the only non-message signals, which are allocated as
+ * messages, which do not use a combined message / heap
+ * fragment.
*/
- if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag)
- || tag == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ,
- ERTS_SIG_Q_TYPE_OFF_HEAP,
- 0)) {
+ if (ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(tag)) {
sig->data.heap_frag = NULL;
return;
}
-
-
- if(ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) {
+ if (ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) {
/* Retrieve pointer to heap fragment (may not be NULL). */
void *attached;
(void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached);
@@ -1489,10 +1593,45 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig)
/*
* Using a combined heap fragment...
*/
- ErtsDistExternal *edep = get_external_non_msg_signal(sig);
- if (edep)
- erts_free_dist_ext_copy(edep);
-
+ switch (ERTS_PROC_SIG_OP(tag)) {
+
+ case ERTS_SIG_Q_OP_ADJ_MSGQ: {
+ /* We need to deallocate yield markers if such has been used... */
+ ErtsYieldAdjMsgQ *yp;
+ switch (ERTS_PROC_SIG_TYPE(tag)) {
+ case ERTS_SIG_Q_TYPE_CLA: {
+ ErtsCLAData *cla = get_cla_data(sig);
+ yp = cla->yield;
+ cla->yield = NULL;
+ break;
+ }
+ case ERTS_SIG_Q_TYPE_OFF_HEAP: {
+ ErtsAdjOffHeapMsgQData *ohdp = get_move_msgq_off_heap_data(sig);
+ yp = ohdp->yield;
+ ohdp->yield = NULL;
+ break;
+ }
+ default:
+ ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type");
+ yp = NULL;
+ break;
+ }
+ if (yp) {
+ ASSERT(!yp->next.in_msgq && !yp->next.in_sigq);
+ ASSERT(!yp->last.in_msgq && !yp->last.in_sigq);
+ erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp);
+ }
+ break;
+ }
+
+ default: {
+ ErtsDistExternal *edep = get_external_non_msg_signal(sig);
+ if (edep)
+ erts_free_dist_ext_copy(edep);
+ break;
+ }
+ }
+
sig->data.attached = ERTS_MSG_COMBINED_HFRAG;
hfrag = sig->hfrag.next;
erts_cleanup_offheap(&sig->hfrag.off_heap);
@@ -2712,6 +2851,7 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id)
cla = (ErtsCLAData *) (char *) hp;
hfrag->used_size = hp - start_hp;
+ cla->yield = NULL;
cla->requester = c_p->common.id;
cla->request_id = req_id_cpy;
@@ -2733,8 +2873,20 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id)
void
erts_proc_sig_send_move_msgq_off_heap(Eterm to)
{
- ErtsMessage *sig = erts_alloc_message(0, NULL);
+ ErtsMessage *sig;
+ Eterm *hp;
+ Uint hsz;
+ ErtsAdjOffHeapMsgQData *ohdp;
ASSERT(is_internal_pid(to));
+
+ hsz = sizeof(ErtsAdjOffHeapMsgQData)/sizeof(Uint);
+ sig = erts_alloc_message(hsz, &hp);
+
+ ohdp = (ErtsAdjOffHeapMsgQData *) (char *) hp;
+ ohdp->yield = NULL;
+
+ sig->hfrag.used_size = 0;
+
ERL_MESSAGE_TERM(sig) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ADJ_MSGQ,
ERTS_SIG_Q_TYPE_OFF_HEAP,
0);
@@ -2751,6 +2903,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to)
}
}
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id)
+{
+ int force_flush_buffers = 0, enqueue_mq, fetch_sigs;
+ ErtsSignal *sig;
+
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS)));
+ ASSERT(flags);
+ ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0);
+ ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+ || is_internal_pid(id) || is_internal_port(id));
+
+ sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon));
+ sig->common.next = NULL;
+ sig->common.specific.attachment = NULL;
+ sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ switch (flags) {
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL:
+ id = c_p->common.id;
+ force_flush_buffers = !0;
+ /* Fall through... */
+ case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID:
+ if (!proc_queue_signal(NULL, id, c_p->common.id, sig,
+ force_flush_buffers, ERTS_SIG_Q_OP_FLUSH))
+ ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves");
+ enqueue_mq = 0;
+ fetch_sigs = !0;
+ break;
+ case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ:
+ enqueue_mq = !0;
+ fetch_sigs = 0;
+ break;
+ default:
+ enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ);
+ fetch_sigs = !0;
+ break;
+ }
+
+ erts_set_gc_state(c_p, 0);
+
+ if (fetch_sigs) {
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
+
+ c_p->sig_qs.flags |= FS_FLUSHING_SIGS;
+
+ if (enqueue_mq) {
+ if (!c_p->sig_qs.cont) {
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ }
+ else {
+ if (!c_p->sig_qs.nmsigs.last) {
+ ASSERT(!c_p->sig_qs.nmsigs.next);
+ c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last;
+ }
+ else {
+ ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last;
+ ASSERT(c_p->sig_qs.nmsigs.next);
+ ASSERT(lsig && !lsig->common.specific.next);
+ lsig->common.specific.next = c_p->sig_qs.cont_last;
+ }
+
+ c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last;
+ *c_p->sig_qs.cont_last = (ErtsMessage *) sig;
+ c_p->sig_qs.cont_last = &sig->common.next;
+ }
+ }
+
+}
+
static int
handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
{
@@ -2937,7 +3166,7 @@ remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig)
static ERTS_INLINE void
remove_mq_sig(Process *c_p, ErtsMessage *sig,
- ErtsMessage **next_sig, ErtsMessage ***next_nm_sig)
+ ErtsMessage **next_sig, ErtsMessage ***next_nm_sig)
{
/*
* Remove signal from (middle) signal queue.
@@ -3165,6 +3394,8 @@ recv_marker_deallocate(Process *c_p, ErtsRecvMarker *markp)
ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
int ix, nix;
+ ASSERT(!markp->is_yield_mark);
+
ASSERT(blkp);
ERTS_HDBG_CHK_RECV_MRKS(c_p);
@@ -3215,6 +3446,7 @@ recv_marker_dequeue(Process *c_p, ErtsRecvMarker *markp)
{
ErtsMessage *sigp;
+ ASSERT(!markp->is_yield_mark);
ASSERT(markp->proc == c_p);
if (markp->in_sigq <= 0) {
@@ -3280,6 +3512,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp,
/* Allocate marker for 'uniqp' in index zero... */
*ixp = 0;
blkp->ref[0] = recv_marker_uniq(c_p, uniqp);
+ blkp->marker[0].is_yield_mark = 0;
markp = &blkp->marker[0];
markp->next_ix = markp->prev_ix = 0;
blkp->used_ix = 0;
@@ -3295,6 +3528,7 @@ recv_marker_alloc_block(Process *c_p, ErtsRecvMarkerBlock **blkpp,
blkp->free_ix = 1;
for (ix = 1; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) {
blkp->ref[ix] = am_free;
+ blkp->marker[ix].is_yield_mark = 0;
if (ix == ERTS_RECV_MARKER_BLOCK_SIZE - 1)
blkp->marker[ix].next_ix = -1; /* End of list */
else
@@ -3562,20 +3796,29 @@ erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id)
}
void
-erts_msgq_remove_leading_recv_markers(Process *c_p)
+erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p)
{
+ ErtsMessage **save;
/*
* Receive markers in the front of the queue does not
- * add any value, so we just remove them...
+ * add any value, so we just remove them. We need to
+ * keep and pass yield markers though...
*/
ASSERT(c_p->sig_qs.first
&& ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
+ save = &c_p->sig_qs.first;
do {
- ErtsRecvMarker *markp = (ErtsRecvMarker *) c_p->sig_qs.first;
- recv_marker_dequeue(c_p, markp);
- } while (c_p->sig_qs.first
- && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first));
+ ErtsRecvMarker *markp = (ErtsRecvMarker *) *save;
+ if (markp->is_yield_mark)
+ save = &markp->sig.common.next;
+ else
+ recv_marker_dequeue(c_p, markp);
+ } while (*save && ERTS_SIG_IS_RECV_MARKER(*save));
+
+ c_p->sig_qs.save = save;
+
+ ASSERT(!*save || ERTS_SIG_IS_MSG(*save));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
}
@@ -3587,7 +3830,8 @@ erts_msgq_pass_recv_markers(Process *c_p, ErtsMessage **markpp)
ASSERT(ERTS_SIG_IS_RECV_MARKER(sigp));
do {
ErtsRecvMarker *markp = (ErtsRecvMarker *) sigp;
- if (++markp->pass > ERTS_RECV_MARKER_PASS_MAX) {
+ if (!markp->is_yield_mark
+ && ++markp->pass > ERTS_RECV_MARKER_PASS_MAX) {
recv_marker_dequeue(c_p, markp);
sigp = *sigpp;
}
@@ -5147,22 +5391,22 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
int *redsp, int max_reds, int local_only)
{
Eterm tag;
- erts_aint32_t state;
+ erts_aint32_t state = *statep;
int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
-
+ ErtsSavedNMSignals delayed_nm_signals = {0};
+
ASSERT(!(c_p->sig_qs.flags & FS_WAIT_HANDLE_SIGS));
if (c_p->sig_qs.flags & FS_HANDLING_SIGS)
- wait_handle_signals(c_p);
+ state = wait_handle_signals(c_p);
else
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
- state = erts_atomic32_read_nob(&c_p->state);
- if (!local_only) {
+ if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
if (ERTS_PSFLG_SIG_IN_Q & state) {
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
@@ -5674,21 +5918,57 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
- case ERTS_SIG_Q_OP_ADJ_MSGQ:
+ case ERTS_SIG_Q_OP_ADJ_MSGQ: {
+ int adj_limit, adj_cnt, min_adj_limit;
+ /*
+ * This may require a substantial amount of work and we
+ * want to get it over and done with in a reasonable
+ * amount of time, so we bump up the limit for it a bit...
+ */
+ min_adj_limit = ERTS_SIG_REDS_CNT_FACTOR*CONTEXT_REDS/6;
+ if (sig->next)
+ adj_limit = min_adj_limit;
+ else {
+ adj_limit = limit - cnt;
+ if (adj_limit < min_adj_limit)
+ adj_limit = min_adj_limit;
+ }
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
switch (ERTS_PROC_SIG_TYPE(tag)) {
case ERTS_SIG_Q_TYPE_CLA:
- cnt += handle_cla(c_p, sig, next_nm_sig, 0);
+ adj_cnt = handle_cla(c_p, sig, next_nm_sig, 0, adj_limit,
+ &delayed_nm_signals);
break;
case ERTS_SIG_Q_TYPE_OFF_HEAP:
- cnt += handle_move_msgq_off_heap(c_p, sig, next_nm_sig, 0);
+ adj_cnt = handle_move_msgq_off_heap(c_p, sig, next_nm_sig,
+ 0, adj_limit,
+ &delayed_nm_signals);
break;
default:
- ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type");
+ ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type");
break;
}
+ cnt += adj_cnt;
+ limit += adj_cnt;
+ if (limit > abs_lim)
+ abs_lim = limit;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ }
+
+ case ERTS_SIG_Q_OP_FLUSH:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ /*
+ * The caller has been exclusively handling signals until this
+ * point. Break out and let the process continue with other
+ * things as well...
+ */
+ goto stop;
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -5719,6 +5999,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
case ERTS_SIG_Q_OP_RECV_MARK: {
ErtsRecvMarker *markp = (ErtsRecvMarker *) sig;
ASSERT(markp->in_sigq);
+ ASSERT(!markp->is_yield_mark);
if (markp->in_sigq < 0) {
/* Marked for removal... */
@@ -5806,7 +6087,7 @@ stop: {
*next_nm_sig = &c_p->sig_qs.cont;
if (c_p->sig_qs.nmsigs.last == tracing.messages.next)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
}
else {
ASSERT(!c_p->sig_qs.nmsigs.next);
@@ -5814,7 +6095,6 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
}
if (tracing.messages.next != &c_p->sig_qs.cont) {
@@ -5860,7 +6140,7 @@ stop: {
ASSERT(c_p->sig_qs.cont);
- *statep = erts_atomic32_read_nob(&c_p->state);
+ state = erts_atomic32_read_nob(&c_p->state);
res = 0;
}
@@ -5893,10 +6173,36 @@ stop: {
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
- *statep = state;
res = !0;
}
+ if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS)
+ & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) {
+ /*
+ * We know we do not have any outstanding signals
+ * from ourselves...
+ */
+ state = erts_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_MAYBE_SELF_SIGS);
+ state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS;
+ }
+
+ if (delayed_nm_signals.first) {
+ /*
+ * We do this after clearing ERTS_PSFLG_MAYBE_SELF_SIGS
+ * since there currently are no signals that can be delayed
+ * that should be counted as originating from the process
+ * itself. If such signals appear in the future this has to
+ * be accounted for...
+ *
+ * The adjust message queue data "signal" does originate from
+ * the process itself, but it is not conseptually a signal.
+ */
+ state = restore_delayed_nm_signals(c_p, &delayed_nm_signals);
+ }
+
+ *statep = state;
+
/* Ensure that 'save' doesn't point to a receive marker... */
if (*c_p->sig_qs.save
&& ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) {
@@ -5906,7 +6212,7 @@ stop: {
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
- *redsp = cnt/4 + 1;
+ *redsp = cnt/ERTS_SIG_REDS_CNT_FACTOR + 1;
if (yield) {
int vreds = max_reds - *redsp;
@@ -6168,23 +6474,31 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
case ERTS_SIG_Q_OP_ADJ_MSGQ:
switch (ERTS_PROC_SIG_TYPE(tag)) {
case ERTS_SIG_Q_TYPE_CLA:
- handle_cla(c_p, sig, next_nm_sig, !0);
+ handle_cla(c_p, sig, next_nm_sig, !0, limit, NULL);
break;
case ERTS_SIG_Q_TYPE_OFF_HEAP:
- handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0);
+ handle_move_msgq_off_heap(c_p, sig, next_nm_sig, !0,
+ limit, NULL);
break;
default:
- ERTS_INTERNAL_ERROR("Invalid 'adjust-message-queue' signal type");
+ ERTS_INTERNAL_ERROR("Invalid adjust-message-queue signal type");
break;
}
break;
+ case ERTS_SIG_Q_OP_FLUSH:
+ ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);
+ c_p->sig_qs.flags |= FS_FLUSHED_SIGS;
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
case ERTS_SIG_Q_OP_RECV_MARK: {
ErtsRecvMarker *markp = (ErtsRecvMarker *) sig;
+ ASSERT(!markp->is_yield_mark);
markp->in_msgq = markp->in_sigq = markp->set_save = 0;
recv_marker_deallocate(c_p, markp);
break;
@@ -6276,6 +6590,7 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_RPC:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_ADJ_MSGQ:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default:
@@ -6288,8 +6603,33 @@ clear_seq_trace_token(ErtsMessage *sig)
void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p)
{
- erts_proc_sig_fetch(c_p);
- ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig));
+ int ix;
+ ErtsSignalInQueueBufferArray *bap;
+ int unget_info;
+ ErtsMessage *qs[] = {c_p->sig_qs.first,
+ c_p->sig_qs.cont,
+ c_p->sig_inq.first};
+
+ ERTS_LC_ASSERT(erts_thr_progress_is_blocking());
+
+ for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) {
+ ErtsMessage *sigp;
+ for (sigp = qs[ix]; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+
+ bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info);
+ if (bap) {
+ for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) {
+ ErtsSignalInQueueBuffer* bp = &bap->slots[ix];
+ if (bp->b.alive) {
+ ErtsMessage *sigp;
+ for (sigp = bp->b.queue.first; sigp; sigp = sigp->next)
+ clear_seq_trace_token(sigp);
+ }
+ }
+ erts_proc_sig_queue_unget_buffers(bap, unget_info);
+ }
}
Uint
@@ -6336,11 +6676,6 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
case ERTS_SIG_Q_OP_ADJ_MSGQ:
- if (type == ERTS_SIG_Q_TYPE_OFF_HEAP) {
- size = sizeof(ErtsMessageRef);
- break;
- }
- /* Fall through... */
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
@@ -6415,6 +6750,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
}
+ case ERTS_SIG_Q_OP_FLUSH:
+ size = sizeof(ErtsSignalCommon);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
size = sizeof(ErtsSigTraceInfo);
break;
@@ -6533,6 +6872,7 @@ erts_proc_sig_receive_helper(Process *c_p,
if (max_reds < reds)
max_reds = reds;
#endif
+ state = erts_atomic32_read_nob(&c_p->state);
(void) erts_proc_sig_handle_incoming(c_p, &state, &reds,
max_reds, !0);
consumed_reds += reds;
@@ -6571,7 +6911,6 @@ erts_proc_sig_receive_helper(Process *c_p,
if (left_reds <= 0)
break; /* yield */
- ASSERT(!c_p->sig_qs.cont);
/* Go fetch again... */
}
@@ -6584,6 +6923,127 @@ erts_proc_sig_receive_helper(Process *c_p,
return consumed_reds;
}
+static void
+init_yield_marker(Process *c_p, ErtsRecvMarker *mrkp)
+{
+ mrkp->prev_next = NULL;
+ mrkp->is_yield_mark = (char) !0;
+ mrkp->pass = (char) 100;
+ mrkp->set_save = (char) 0;
+ mrkp->in_sigq = (char) 0;
+ mrkp->in_msgq = (char) 0;
+ mrkp->prev_ix = (char) -100;
+ mrkp->next_ix = (char) -100;
+#ifdef DEBUG
+ mrkp->used = (char) !0;
+ mrkp->proc = c_p;
+#endif
+ mrkp->sig.common.next = NULL;
+ mrkp->sig.common.specific.attachment = NULL;
+ mrkp->sig.common.tag = ERTS_RECV_MARKER_TAG;
+}
+
+static void
+remove_yield_marker(Process *c_p, ErtsRecvMarker *mrkp)
+{
+ ASSERT(mrkp);
+ ASSERT(mrkp->is_yield_mark);
+ ASSERT(mrkp->in_msgq);
+ remove_iq_sig(c_p, (ErtsMessage *) mrkp, mrkp->prev_next);
+ mrkp->in_msgq = 0;
+ mrkp->in_sigq = 0;
+ mrkp->prev_next = NULL;
+ mrkp->sig.common.next = NULL;
+}
+
+static ErtsYieldAdjMsgQ *
+create_yield_adj_msgq_data(Process *c_p)
+{
+ ErtsYieldAdjMsgQ *yp = erts_alloc(ERTS_ALC_T_SIG_YIELD_DATA,
+ sizeof(ErtsYieldAdjMsgQ));
+ init_yield_marker(c_p, &yp->next);
+ init_yield_marker(c_p, &yp->last);
+ return yp;
+}
+
+static ERTS_INLINE void
+insert_adj_msgq_yield_markers(Process *c_p,
+ ErtsYieldAdjMsgQ *yp,
+ ErtsMessage **nextpp,
+ ErtsMessage ***next_nm_sig,
+ ErtsSavedNMSignals *saved_sigs)
+{
+ ErtsMessage *sig, *nextp;
+
+ ASSERT(yp);
+ ASSERT(nextpp);
+ ASSERT(next_nm_sig && *next_nm_sig && **next_nm_sig);
+ ASSERT(!yp->next.in_msgq);
+
+ sig = **next_nm_sig;
+
+ ASSERT(ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(sig))
+ == ERTS_SIG_Q_OP_ADJ_MSGQ);
+
+ /*
+ * Insert 'next' yield marker. This is in the inner queue or
+ * in the beginning of the middle queue where we've already
+ * begun using 'prev_next' pointers for receive markers,
+ * so if a receive marker follow, we need to update it.
+ */
+ yp->next.in_msgq = !0;
+ yp->next.in_sigq = !0;
+ yp->next.prev_next = nextpp;
+ yp->next.sig.common.next = nextp = *nextpp;
+ *nextpp = (ErtsMessage *) &yp->next;
+
+ ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->next);
+
+ if (nextp && ERTS_SIG_IS_RECV_MARKER(nextp)) {
+ ErtsRecvMarker *next_mrkp = (ErtsRecvMarker *) nextp;
+ next_mrkp->prev_next = &yp->next.sig.common.next;
+ }
+
+ if (yp->last.in_msgq) {
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ }
+ else {
+ /*
+ * Replace adj-msgq signal with 'last' yield marker.
+ *
+ * This is in the middle queue after the point where
+ * we've begun using 'prev_next' pointers for receive
+ * markers, so if a receive marker follow, we do not
+ * need to adjust its 'prev_next'.
+ */
+ ErtsMessage **next_sig = *next_nm_sig;
+ yp->last.in_msgq = (char) !0;
+ yp->last.in_sigq = (char) !0;
+ yp->last.prev_next = next_sig;
+ *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next;
+ *next_sig = (ErtsMessage *) &yp->last;
+ remove_mq_sig(c_p, sig, &yp->last.sig.common.next, next_nm_sig);
+
+ ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->last);
+ }
+
+ save_delayed_nm_signal(saved_sigs, sig);
+}
+
+static ERTS_INLINE void
+destroy_adj_msgq_yield_markers(Process *c_p, ErtsYieldAdjMsgQ **ypp)
+{
+ ErtsYieldAdjMsgQ *yp = *ypp;
+ if (yp) {
+ if (yp->next.in_msgq)
+ remove_yield_marker(c_p, &yp->next);
+ if (yp->last.in_msgq)
+ remove_yield_marker(c_p, &yp->last);
+ erts_free(ERTS_ALC_T_SIG_YIELD_DATA, yp);
+ *ypp = NULL;
+ }
+}
+
static Uint
area_literal_size(Eterm* start, Eterm* end, char* lit_start, Uint lit_size)
{
@@ -6705,17 +7165,16 @@ static int
handle_cla(Process *c_p,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig,
- int exiting)
+ int exiting,
+ int limit,
+ ErtsSavedNMSignals *saved_nm_sigs)
{
- /*
- * TODO: Implement yielding support!
- */
ErtsCLAData *cla;
- ErtsMessage *msg;
+ ErtsMessage *msg, *endp;
ErtsLiteralArea *la;
char *literals;
Uint lit_bsize;
- int nmsgs, reds;
+ int nmsgs, reds, stretch_yield_limit = 0;
Eterm result = am_ok;
Uint64 cnt = 0;
@@ -6736,6 +7195,30 @@ handle_cla(Process *c_p,
* can therefore occur behind this signal.
*/
+ msg = c_p->sig_qs.first;
+ if (!msg)
+ msg = c_p->sig_qs.cont;
+
+ if (!cla->yield) {
+ endp = sig;
+ }
+ else {
+ if (!cla->yield->next.in_msgq) {
+ /* All messages already handled... */
+ ASSERT(!cla->yield->last.in_msgq);
+ stretch_yield_limit = !0;
+ endp = msg = sig;
+ }
+ else {
+ ASSERT(!!cla->yield->last.in_msgq);
+ msg = cla->yield->next.sig.common.next;
+ endp = (ErtsMessage *) &cla->yield->last;
+ remove_yield_marker(c_p, &cla->yield->next);
+ }
+ }
+
+ ASSERT(!cla->yield || !cla->yield->next.in_msgq);
+
la = ERTS_COPY_LITERAL_AREA();
if (!la) {
ASSERT(0);
@@ -6748,12 +7231,8 @@ handle_cla(Process *c_p,
literals = (char *) &la->start[0];
lit_bsize = (char *) la->end - literals;
- msg = c_p->sig_qs.first;
- if (!msg)
- msg = c_p->sig_qs.cont;
-
nmsgs = 0;
- while (msg != sig) {
+ while (msg != endp) {
ASSERT(!!msg);
nmsgs++;
if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) {
@@ -6848,6 +7327,18 @@ handle_cla(Process *c_p,
}
}
+ if (cnt > limit) { /* yield... */
+ ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next;
+ ASSERT(*nextpp);
+ if (*nextpp == endp)
+ break; /* we're at the end; no point yielding here... */
+ if (!cla->yield)
+ cla->yield = create_yield_adj_msgq_data(c_p);
+ insert_adj_msgq_yield_markers(c_p, cla->yield, nextpp,
+ next_nm_sig, saved_nm_sigs);
+ return cnt;
+ }
+
msg = msg->next;
if (!msg)
msg = c_p->sig_qs.cont;
@@ -6855,18 +7346,36 @@ handle_cla(Process *c_p,
remove_nm_sig(c_p, sig, next_nm_sig);
- reds = 0;
- if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize))
- result = am_need_gc;
-
- cnt += reds * ERTS_SIG_REDS_CNT_FACTOR;
+ reds = erts_check_copy_literals_gc_need_max_reds(c_p);
+ cnt++;
+ if (reds > CONTEXT_REDS)
+ result = am_check_gc;
+ else if (stretch_yield_limit
+ || cnt + reds*ERTS_SIG_REDS_CNT_FACTOR <= limit) {
+ reds = 0;
+ if (erts_check_copy_literals_gc_need(c_p, &reds, literals, lit_bsize))
+ result = am_need_gc;
+ cnt += reds * ERTS_SIG_REDS_CNT_FACTOR;
+ }
+ else {
+ /* yield... */
+ if (!cla->yield)
+ cla->yield = create_yield_adj_msgq_data(c_p);
+ else if (!!cla->yield->last.in_msgq)
+ remove_yield_marker(c_p, &cla->yield->last);
+ ASSERT(!cla->yield->next.in_msgq);
+ save_delayed_nm_signal(saved_nm_sigs, sig);
+ return cnt;
+ }
done:
+ destroy_adj_msgq_yield_markers(c_p, &cla->yield);
+
send_cla_reply(c_p, sig, cla->requester, cla->request_id, result);
- if (cnt > CONTEXT_REDS)
- return CONTEXT_REDS;
+ if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR)
+ return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR;
return cnt;
}
@@ -6874,12 +7383,12 @@ static int
handle_move_msgq_off_heap(Process *c_p,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig,
- int exiting)
+ int exiting,
+ int limit,
+ ErtsSavedNMSignals *saved_nm_sigs)
{
- /*
- * TODO: Implement yielding support!
- */
- ErtsMessage *msg;
+ ErtsAdjOffHeapMsgQData *ohdp;
+ ErtsMessage *msg, *endp;
int nmsgs;
Uint64 cnt = 0;
@@ -6899,6 +7408,8 @@ handle_move_msgq_off_heap(Process *c_p,
cnt++;
+ ohdp = get_move_msgq_off_heap_data(sig);
+
if (exiting) {
/* signal already removed from queue... */
goto cleanup;
@@ -6919,8 +7430,21 @@ handle_move_msgq_off_heap(Process *c_p,
if (!msg)
msg = c_p->sig_qs.cont;
+ if (!ohdp->yield) {
+ endp = sig;
+ }
+ else {
+ ASSERT(!!ohdp->yield->next.in_msgq);
+ ASSERT(!!ohdp->yield->last.in_msgq);
+ msg = ohdp->yield->next.sig.common.next;
+ endp = (ErtsMessage *) &ohdp->yield->last;
+ remove_yield_marker(c_p, &ohdp->yield->next);
+ }
+
+ ASSERT(!ohdp->yield || !ohdp->yield->next.in_msgq);
+
nmsgs = 0;
- while (msg != sig) {
+ while (msg != endp) {
ASSERT(!!msg);
nmsgs++;
if (nmsgs >= ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR) {
@@ -6997,6 +7521,18 @@ handle_move_msgq_off_heap(Process *c_p,
cnt += h_sz/ERTS_PROC_SIG_ADJ_MSGQ_COPY_FACTOR;
}
+ if (cnt > limit) { /* yield... */
+ ErtsMessage **nextpp = !msg->next ? &c_p->sig_qs.cont : &msg->next;
+ ASSERT(*nextpp);
+ if (*nextpp == endp)
+ break; /* we're at the end; no point yielding... */
+ if (!ohdp->yield)
+ ohdp->yield = create_yield_adj_msgq_data(c_p);
+ insert_adj_msgq_yield_markers(c_p, ohdp->yield, nextpp,
+ next_nm_sig, saved_nm_sigs);
+ return cnt;
+ }
+
msg = msg->next;
if (!msg)
msg = c_p->sig_qs.cont;
@@ -7008,13 +7544,15 @@ done:
cleanup:
+ destroy_adj_msgq_yield_markers(c_p, &ohdp->yield);
sig->next = NULL;
+ sig->data.attached = ERTS_MSG_COMBINED_HFRAG;
erts_cleanup_messages(sig);
c_p->sig_qs.flags &= ~FS_OFF_HEAP_MSGQ_CHNG;
- if (cnt > CONTEXT_REDS)
- return CONTEXT_REDS;
+ if (cnt > CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR)
+ return CONTEXT_REDS*ERTS_SIG_REDS_CNT_FACTOR;
return cnt;
}
@@ -7242,7 +7780,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip)
+ ErtsMessageInfo *mip,
+ Sint *msgq_len_p)
{
Uint tot_heap_size;
ErtsMessage *mp, **mpp;
@@ -7316,7 +7855,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
mp = mp->next;
}
- ASSERT(c_p->sig_qs.len == i);
+
+ ASSERT(info_on_self || c_p->sig_qs.len == i);
+ ASSERT(!info_on_self || c_p->sig_qs.len >= i);
+
+ *msgq_len_p = i;
return tot_heap_size;
}
@@ -7405,7 +7948,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)
BIF_RET(am_normal); /* will handle signals itself... */
}
else {
- erts_aint32_t state;
int done;
Eterm res = am_false;
int reds = 0;
@@ -7451,9 +7993,13 @@ erts_proc_sig_cleanup_queues(Process *c_p)
while (sig) {
ErtsMessage *free_sig = sig;
sig = sig->next;
- if (ERTS_SIG_IS_RECV_MARKER(free_sig))
- recv_marker_deallocate(c_p, (ErtsRecvMarker *) free_sig);
+ if (ERTS_SIG_IS_RECV_MARKER(free_sig)) {
+ ErtsRecvMarker *recv_mark = (ErtsRecvMarker *) free_sig;
+ ASSERT(!recv_mark->is_yield_mark);
+ recv_marker_deallocate(c_p, recv_mark);
+ }
else {
+ ASSERT(ERTS_SIG_IS_MSG(free_sig));
free_sig->next = NULL;
erts_cleanup_messages(free_sig);
}
@@ -7473,7 +8019,7 @@ erts_proc_sig_cleanup_queues(Process *c_p)
/* Debug */
-static void
+static erts_aint32_t
wait_handle_signals(Process *c_p)
{
/*
@@ -7523,6 +8069,8 @@ wait_handle_signals(Process *c_p)
c_p->sig_qs.flags &= ~FS_WAIT_HANDLE_SIGS;
c_p->sig_qs.flags |= FS_HANDLING_SIGS;
+
+ return erts_atomic32_read_mb(&c_p->state);
}
static void
@@ -7649,9 +8197,6 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
break;
case ERTS_SIG_Q_OP_ADJ_MSGQ:
- if (type == ERTS_SIG_Q_TYPE_OFF_HEAP)
- break;
- /* Fall through... */
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
break;
@@ -7704,6 +8249,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_PROCESS_INFO:
case ERTS_SIG_Q_OP_RECV_MARK:
case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK:
+ case ERTS_SIG_Q_OP_FLUSH:
break;
default:
@@ -7993,7 +8539,10 @@ proc_sig_hdbg_check_queue(Process *proc,
if (sig_psflg != ERTS_PSFLG_FREE) {
erts_aint32_t state = erts_atomic32_read_nob(&proc->state);
- ERTS_ASSERT(nm_sigs ? !!(state & sig_psflg) : !(state & sig_psflg));
+ ERTS_ASSERT(nm_sigs
+ ? !!(state & sig_psflg)
+ : (!(state & sig_psflg)
+ || !!erts_atomic_read_nob(&proc->sig_inq_buffers)));
}
return msg_len;
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index b3aefff0b0..8faa79507f 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -110,7 +110,7 @@ typedef struct {
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 18
+#define ERTS_SIG_Q_OP_MAX 19
#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
@@ -130,7 +130,8 @@ typedef struct {
#define ERTS_SIG_Q_OP_ALIAS_MSG 15
#define ERTS_SIG_Q_OP_RECV_MARK 16
#define ERTS_SIG_Q_OP_UNLINK_ACK 17
-#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_ADJ_MSGQ 18
+#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)
@@ -1133,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to);
*
* @param[out] statep Pointer to process state after
* signal handling. May not be NULL.
+ * The state should recently have
+ * been updated before calling
+ * this function.
*
* @param[in,out] redsp Pointer to an integer containing
* reductions. On input, the amount
@@ -1253,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls,
int neg_o_reds, ErtsMessage **msgpp,
int *get_outp);
+/*
+ * CLEAN_SIGQ - Flush until middle queue is empty, i.e.
+ * the content of inner+middle queue equals
+ * the message queue.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0)
+/*
+ * FROM_ALL - Flush signals from all local senders (processes
+ * and ports).
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1)
+/*
+ * FROM_ID - Flush signals from process or port identified
+ * by 'id'.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2)
+
+/*
+ * All erts_proc_sig_init_flush_signals() flags.
+ */
+#define ERTS_PROC_SIG_FLUSH_FLGS \
+ (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \
+ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID)
+
+/**
+ *
+ * @brief Initialize flush of signals from another process or port
+ *
+ * Inserts a flush signal in the outer signal queue of
+ * current process and sets the FS_FLUSHING_SIGS flag in
+ * 'c_p->sig_qs.flags'. When the flush signal has been
+ * handled the FS_FLUSHED_SIGS flag is set as well.
+ *
+ * While the flushing is ongoing the process *should* only
+ * handle incoming signals and not execute Erlang code. When
+ * the functionality that initiated the flush detects that
+ * the flush is done by the FS_FLUSHED_SIGS flag being set,
+ * it should clear both the FS_FLUSHED_SIGS flag and the
+ * FS_FLUSHING_SIGS flag.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ * flags Flags indicating how to flush.
+ * (see above).
+ * from Identifier of sender to flush
+ * signals from in case the
+ * FROM_ID flag is set.
+ */
+void
+erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from);
+
/**
*
* @brief Fetch signals from the outer queue
@@ -1370,12 +1426,16 @@ typedef struct {
*
* @param[in] mip Pointer to array of
* ErtsMessageInfo structures.
+ *
+ * @param[out] msgq_lenp Pointer to integer containing
+ * amount of messages.
*/
Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
- ErtsMessageInfo *mip);
+ ErtsMessageInfo *mip,
+ Sint *msgq_lenp);
/**
*
@@ -1676,7 +1736,7 @@ Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id);
void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id);
ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p,
ErtsMessage **markpp);
-void erts_msgq_remove_leading_recv_markers(Process *c_p);
+void erts_msgq_remove_leading_recv_markers_set_save_first(Process *c_p);
#define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \
((int) ((MRKP) - &(BLKP)->marker[0]))
@@ -1725,6 +1785,10 @@ erts_proc_sig_fetch(Process *proc)
== (ERTS_PROC_LOCK_MAIN
| ERTS_PROC_LOCK_MSGQ)));
+ ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS)
+ || ERTS_PROC_IS_EXITING(proc)
+ || ERTS_IS_CRASH_DUMPING);
+
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc);
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
@@ -2048,8 +2112,9 @@ erts_msgq_set_save_first(Process *c_p)
* anymore...
*/
if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first))
- erts_msgq_remove_leading_recv_markers(c_p);
- c_p->sig_qs.save = &c_p->sig_qs.first;
+ erts_msgq_remove_leading_recv_markers_set_save_first(c_p);
+ else
+ c_p->sig_qs.save = &c_p->sig_qs.first;
}
ERTS_GLB_INLINE void
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index ff81b453a7..00b6d16862 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -6631,10 +6631,12 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p,
*/
/*
- * No normal execution until dirty CLA or hibernat has
+ * No normal execution until dirty CLA or hibernate has
* been handled...
*/
- ASSERT(!(p->flags & (F_DIRTY_CLA | F_DIRTY_GC_HIBERNATE)));
+ ASSERT(!(p->flags & (F_DIRTY_CHECK_CLA
+ | F_DIRTY_CLA
+ | F_DIRTY_GC_HIBERNATE)));
a = erts_atomic32_read_band_nob(&p->state,
~ERTS_PSFLG_DIRTY_ACTIVE_SYS);
@@ -10063,27 +10065,35 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
/* On normal scheduler */
if (state & ERTS_PSFLG_RUNNING_SYS) {
if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
- int local_only = (!!(p->sig_qs.flags & FS_LOCAL_SIGS_ONLY)
- & !(state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLGS_DIRTY_WORK)));
- if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) {
- int sig_reds;
+ int sig_reds;
+ /*
+ * If we have dirty work scheduled we allow
+ * usage of all reductions since we need to
+ * handle all signals before doing dirty
+ * work...
+ *
+ * If a BIF is flushing signals, we also allow
+ * usage of all reductions since the BIF cannot
+ * continue exectution until the flush
+ * completes...
+ */
+ sig_reds = reds;
+ if (((state & (ERTS_PSFLGS_DIRTY_WORK
+ | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE)
+ && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) {
/*
- * If we have dirty work scheduled we allow
- * usage of all reductions since we need to
- * handle all signals before doing dirty
- * work...
+ * We are active, i.e., have erlang work to do,
+ * and have no dirty work and are not flushing
+ * limit amount of signal handling work...
*/
- if (state & ERTS_PSFLGS_DIRTY_WORK)
- sig_reds = reds;
- else
- sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED;
- (void) erts_proc_sig_handle_incoming(p,
- &state,
- &sig_reds,
- sig_reds,
- local_only);
- reds -= sig_reds;
+ sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED;
}
+ (void) erts_proc_sig_handle_incoming(p,
+ &state,
+ &sig_reds,
+ sig_reds,
+ 0);
+ reds -= sig_reds;
}
if ((state & (ERTS_PSFLG_SYS_TASKS
| ERTS_PSFLG_EXITING)) == ERTS_PSFLG_SYS_TASKS) {
@@ -10093,8 +10103,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
* hand written beam assembly in
* prim_eval:'receive'. If GC is delayed we are
* not allowed to execute system tasks.
+ *
+ * We also don't allow execution of system tasks
+ * if a BIF is flushing signals, since there are
+ * system tasks that might need to fetch from the
+ * outer signal queue...
*/
- if (!(p->flags & F_DELAY_GC)) {
+ if (!(p->flags & F_DELAY_GC)
+ && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) {
int cost = execute_sys_tasks(p, &state, reds);
calls += cost;
reds -= cost;
@@ -10591,25 +10607,49 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
int fcalls;
int cla_reds = 0;
- if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p))
- fcalls = reds;
- else
- fcalls = reds - CONTEXT_REDS;
- st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls);
- reds -= cla_reds;
- if (is_non_value(st_res)) {
- if (c_p->flags & F_DIRTY_CLA) {
- save_dirty_task(c_p, st);
- st = NULL;
- break;
- }
- /* Needed gc, but gc was disabled */
- save_gc_task(c_p, st, st_prio);
- st = NULL;
- break;
- }
- /* We did a major gc */
- minor_gc = major_gc = 1;
+ if (st->arg[0] == am_true) {
+ /*
+ * Check if copy literal area GC is needed and only
+ * do GC if needed. This check is never requested unless
+ * we know that this is to much work to do on a normal
+ * scheduler, so we do not even try to check it here
+ * but instead unconditionally schedule this as dirty
+ * work...
+ */
+ if (c_p->flags & F_DISABLE_GC) {
+ /* We might need to GC, but GC was disabled */
+ save_gc_task(c_p, st, st_prio);
+ st = NULL;
+ }
+ else {
+ c_p->flags |= F_DIRTY_CHECK_CLA;
+ save_dirty_task(c_p, st);
+ st = NULL;
+ erts_schedule_dirty_sys_execution(c_p);
+ }
+ }
+ else {
+ /* Copy literal area GC needed... */
+ if (!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p))
+ fcalls = reds;
+ else
+ fcalls = reds - CONTEXT_REDS;
+ st_res = erts_copy_literals_gc(c_p, &cla_reds, fcalls);
+ reds -= cla_reds;
+ if (is_non_value(st_res)) {
+ if (c_p->flags & F_DIRTY_CLA) {
+ save_dirty_task(c_p, st);
+ st = NULL;
+ break;
+ }
+ /* Needed gc, but gc was disabled */
+ save_gc_task(c_p, st, st_prio);
+ st = NULL;
+ break;
+ }
+ /* We did a major gc */
+ minor_gc = major_gc = 1;
+ }
break;
}
case ERTS_PSTT_FTMQ:
@@ -10622,15 +10662,19 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
break;
case ERTS_PSTT_PRIO_SIG: {
erts_aint32_t fail_state, state;
- int sig_res, sig_reds = reds;
+ int sig_res, sig_reds;
st_res = am_false;
+ ASSERT(!(c_p->sig_qs.flags & FS_FLUSHING_SIGS));
+
if (st->arg[0] == am_false) {
erts_proc_sig_queue_lock(c_p);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ st->arg[0] = am_true;
}
+ state = erts_atomic32_read_nob(&c_p->state);
sig_reds = reds;
sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds,
reds, !0);
@@ -10644,8 +10688,6 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
if (sig_res)
break;
- st->arg[0] = am_true;
-
fail_state = ERTS_PSFLG_EXITING;
if (schedule_process_sys_task(c_p, st_prio, st, &fail_state)) {
@@ -10656,6 +10698,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
state = erts_atomic32_read_nob(&c_p->state);
exit_permanent_prio_elevation(c_p, state, st_prio);
}
+
break;
}
case ERTS_PSTT_TEST:
@@ -10811,10 +10854,12 @@ erts_execute_dirty_system_task(Process *c_p)
/*
* If multiple operations, perform them in the following
* order (in order to avoid unnecessary GC):
- * 1. Copy Literal Area (implies major GC).
- * 2. GC Hibernate (implies major GC if not woken).
- * 3. Major GC (implies minor GC).
- * 4. Minor GC.
+ * 1. Check for Copy Literals Area GC need. This may
+ * trigger a Copy Literals Area GC.
+ * 2. Copy Literal Area GC (implies major GC).
+ * 3. GC Hibernate (implies major GC if not woken).
+ * 4. Major GC (implies minor GC).
+ * 5. Minor GC.
*
* System task requests are handled after the actual
* operations have been performed...
@@ -10822,6 +10867,37 @@ erts_execute_dirty_system_task(Process *c_p)
ASSERT(!(c_p->flags & (F_DELAY_GC|F_DISABLE_GC)));
+ if (c_p->flags & F_DIRTY_CHECK_CLA) {
+ ErtsLiteralArea *la = ERTS_COPY_LITERAL_AREA();
+
+ ASSERT(!(c_p->flags & F_DIRTY_CLA));
+ c_p->flags &= ~F_DIRTY_CHECK_CLA;
+ if (!la)
+ cla_res = am_ok;
+ else {
+ int check_cla_reds = 0;
+ char *literals = (char *) &la->start[0];
+ Uint lit_bsize = (char *) la->end - literals;
+ if (erts_check_copy_literals_gc_need(c_p,
+ &check_cla_reds,
+ literals,
+ lit_bsize)) {
+ /*
+ * We had references to this literal area on the heap;
+ * need a copy literals GC...
+ */
+ c_p->flags |= F_DIRTY_CLA;
+ }
+ else {
+ /*
+ * We have no references to this literal area on the
+ * heap; no copy literals GC needed...
+ */
+ cla_res = am_ok;
+ }
+ }
+ }
+
if (c_p->flags & F_DIRTY_CLA) {
int cla_reds = 0;
cla_res = erts_copy_literals_gc(c_p, &cla_reds, c_p->fcalls);
@@ -10850,7 +10926,8 @@ erts_execute_dirty_system_task(Process *c_p)
c_p->arity, c_p->fcalls);
}
- ASSERT(!(c_p->flags & (F_DIRTY_CLA
+ ASSERT(!(c_p->flags & (F_DIRTY_CHECK_CLA
+ | F_DIRTY_CLA
| F_DIRTY_GC_HIBERNATE
| F_DIRTY_MAJOR_GC
| F_DIRTY_MINOR_GC)));
@@ -11234,7 +11311,7 @@ erts_internal_request_system_task_4(BIF_ALIST_4)
}
void
-erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id)
+erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check)
{
Process *rp;
ErtsProcSysTask *st;
@@ -11260,7 +11337,8 @@ erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id)
req_id_sz,
&hp,
&st->off_heap);
- for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++)
+ st->arg[0] = check ? am_true : am_false;
+ for (i = 1; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++)
st->arg[i] = THE_NON_VALUE;
rp = erts_proc_lookup_raw(to);
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 81958e373a..8dc55a8da0 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -1246,8 +1246,9 @@ void erts_check_for_holes(Process* p);
process table. Always ACTIVE while EXITING. Never
SUSPENDED unless also FREE. */
#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5)
-/* UNUSED */
-#define ERTS_PSFLG_UNUSED ERTS_PSFLG_BIT(6)
+/* MAYBE_SELF_SIGS - We might have outstanding signals
+ from ourselves to ourselvs. */
+#define ERTS_PSFLG_MAYBE_SELF_SIGS ERTS_PSFLG_BIT(6)
/* ACTIVE - Process "wants" to execute */
#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7)
/* IN_RUNQ - Real process (not proxy) struct used in a
@@ -1567,15 +1568,18 @@ extern int erts_system_profile_ts_type;
#define F_TRAP_EXIT (1 << 22) /* Trapping exit */
#define F_FRAGMENTED_SEND (1 << 23) /* Process is doing a distributed fragmented send */
#define F_DBG_FORCED_TRAP (1 << 24) /* DEBUG: Last BIF call was a forced trap */
+#define F_DIRTY_CHECK_CLA (1 << 25) /* Check if copy literal area GC scheduled */
/* Signal queue flags */
#define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */
#define FS_ON_HEAP_MSGQ (1 << 1) /* On heap msg queue */
#define FS_OFF_HEAP_MSGQ_CHNG (1 << 2) /* Off heap msg queue changing */
-#define FS_LOCAL_SIGS_ONLY (1 << 3) /* Handle privq sigs only */
+#define FS_UNUSED (1 << 3) /* Unused */
#define FS_HANDLING_SIGS (1 << 4) /* Process is handling signals */
#define FS_WAIT_HANDLE_SIGS (1 << 5) /* Process is waiting to handle signals */
#define FS_DELAYED_PSIGQS_LEN (1 << 6) /* Delayed update of sig_qs.len */
+#define FS_FLUSHING_SIGS (1 << 7) /* Currently flushing signals */
+#define FS_FLUSHED_SIGS (1 << 8) /* Flushing of signals completed */
/*
* F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent
@@ -1931,7 +1935,7 @@ void erts_schedule_thr_prgr_later_cleanup_op(void (*)(void *),
ErtsThrPrgrLaterOp *,
UWord);
void erts_schedule_complete_off_heap_message_queue_change(Eterm pid);
-void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id);
+void erts_schedule_cla_gc(Process *c_p, Eterm to, Eterm req_id, int check);
struct db_fixation;
void erts_schedule_ets_free_fixation(Eterm pid, struct db_fixation*);
void erts_schedule_flush_trace_messages(Process *proc, int force_on_proc);
diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c
index f95c7ad1d7..9fa5969b5e 100644
--- a/erts/emulator/beam/erl_process_dump.c
+++ b/erts/emulator/beam/erl_process_dump.c
@@ -159,9 +159,12 @@ Uint erts_process_memory(Process *p, int include_sigs_in_transit)
* Size of message queue plus size of all signals
* in transit to the process!
*/
- erts_proc_sig_queue_lock(p);
- erts_proc_sig_fetch(p);
- erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ if (!(p->sig_qs.flags & FS_FLUSHING_SIGS)
+ || ERTS_IS_CRASH_DUMPING) {
+ erts_proc_sig_queue_lock(p);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
ERTS_FOREACH_SIG_PRIVQS(
p, mp,
@@ -228,7 +231,9 @@ dump_process_info(fmtfn_t to, void *to_arg, Process *p)
if (ERTS_TRACE_FLAGS(p) & F_SENSITIVE)
return;
- erts_proc_sig_fetch(p);
+ if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) || ERTS_IS_CRASH_DUMPING) {
+ erts_proc_sig_fetch(p);
+ }
if (p->sig_qs.first || p->sig_qs.cont) {
erts_print(to, to_arg, "=proc_messages:%T\n", p->common.id);
@@ -1123,8 +1128,8 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg)
erts_print(to, to_arg, "FREE"); break;
case ERTS_PSFLG_EXITING:
erts_print(to, to_arg, "EXITING"); break;
- case ERTS_PSFLG_UNUSED:
- erts_print(to, to_arg, "UNUSED"); break;
+ case ERTS_PSFLG_MAYBE_SELF_SIGS:
+ erts_print(to, to_arg, "MAYBE_SELF_SIGS"); break;
case ERTS_PSFLG_ACTIVE:
erts_print(to, to_arg, "ACTIVE"); break;
case ERTS_PSFLG_IN_RUNQ:
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index f892d0dd89..f54596cf58 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -957,6 +957,7 @@ Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2);
Eterm erts_check_process_code(Process *c_p, Eterm module, int *redsp, int fcalls);
#define ERTS_CLA_SCAN_WORDS_PER_RED 512
+int erts_check_copy_literals_gc_need_max_reds(Process *c_p);
int erts_check_copy_literals_gc_need(Process *c_p, int *redsp,
char *literals, Uint lit_bsize);
Eterm erts_copy_literals_gc(Process *c_p, int *redsp, int fcalls);
diff --git a/erts/emulator/test/bif_SUITE.erl b/erts/emulator/test/bif_SUITE.erl
index c5be79528b..ee6c494145 100644
--- a/erts/emulator/test/bif_SUITE.erl
+++ b/erts/emulator/test/bif_SUITE.erl
@@ -37,6 +37,7 @@
error_stacktrace_during_call_trace/1,
group_leader_prio/1, group_leader_prio_dirty/1,
is_process_alive/1,
+ is_process_alive_signal_from/1,
process_info_blast/1,
os_env_case_sensitivity/1,
verify_middle_queue_save/1,
@@ -57,7 +58,8 @@ all() ->
erl_crash_dump_bytes, min_max, erlang_halt, is_builtin,
error_stacktrace, error_stacktrace_during_call_trace,
group_leader_prio, group_leader_prio_dirty,
- is_process_alive, process_info_blast, os_env_case_sensitivity,
+ is_process_alive, is_process_alive_signal_from,
+ process_info_blast, os_env_case_sensitivity,
verify_middle_queue_save, test_length,fixed_apply_badarg,
external_fun_apply3].
@@ -1205,6 +1207,51 @@ is_process_alive(Config) when is_list(Config) ->
Ps),
ok.
+is_process_alive_signal_from(Config) when is_list(Config) ->
+ process_flag(priority, high),
+ process_flag(scheduler, 1),
+ Schdlr = case erlang:system_info(schedulers_online) of
+ 1 -> 1;
+ _ -> 2
+ end,
+ X = is_process_alive_signal_from_test(100000, 0, Schdlr),
+ erlang:display({exits_detected, X}),
+ {comment, integer_to_list(X) ++ " exited processes detected"}.
+
+is_process_alive_signal_from_test(0, X, _Schdlr) ->
+ X;
+is_process_alive_signal_from_test(N, X, Schdlr) ->
+ Tester = self(),
+ {Testee, TMon} = spawn_opt(fun () ->
+ Mon = erlang:monitor(process, Tester),
+ Tester ! {self(), ready},
+ busy_wait_go(),
+ _ = erlang:demonitor(Mon),
+ exit(normal)
+ end,
+ [link,
+ monitor,
+ {priority, high},
+ {scheduler, Schdlr}]),
+ receive {Testee, ready} -> ok end,
+ {monitored_by, MBList1} = process_info(self(), monitored_by),
+ true = lists:member(Testee, MBList1),
+ erlang:yield(),
+ Testee ! {go, ok},
+ erlang:yield(),
+ NewX = case erlang:is_process_alive(Testee) of
+ true ->
+ X;
+ false ->
+ %% Demonitor signal should have reached us before the
+ %% is-process-alive reply...
+ {monitored_by, MBList2} = process_info(self(), monitored_by),
+ false = lists:member(Testee, MBList2),
+ X+1
+ end,
+ receive {'DOWN', TMon, process, Testee, normal} -> ok end,
+ is_process_alive_signal_from_test(N-1, NewX, Schdlr).
+
process_info_blast(Config) when is_list(Config) ->
Tester = self(),
NoAttackers = 1000,
@@ -1396,7 +1443,16 @@ external_fun_apply3(_Config) ->
ok.
%% helpers
-
+
+busy_wait_go() ->
+ receive
+ {go, Info} ->
+ Info
+ after
+ 0 ->
+ busy_wait_go()
+ end.
+
id(I) -> I.
%% Get code path, including the path for the erts application.
diff --git a/erts/emulator/test/literal_area_collector_test.erl b/erts/emulator/test/literal_area_collector_test.erl
index 14583db88d..d0b6fb3822 100644
--- a/erts/emulator/test/literal_area_collector_test.erl
+++ b/erts/emulator/test/literal_area_collector_test.erl
@@ -28,38 +28,29 @@ check_idle(Timeout) when is_integer(Timeout) > 0 ->
ScaledTimeout = Timeout*test_server:timetrap_scale_factor(),
Pid = find_literal_area_collector(),
Start = erlang:monotonic_time(millisecond),
- try
- wait_for_idle_literal_collector(Pid, Start, ScaledTimeout, -1, 0)
- catch
- throw:done ->
- ok
- end.
+ Alias = alias(),
+ wait_for_idle_literal_collector(Pid, Alias, Start, ScaledTimeout).
-wait_for_idle_literal_collector(Pid, Start, Timeout, NWaiting, WRedsStart) ->
- {W, R} = case process_info(Pid, [status, reductions]) of
- [{status, waiting}, {reductions, Reds}] ->
- %% Assume that reds aren't bumped more than
- %% 2 in order to service this process info
- %% request...
- case {NWaiting > 100, Reds - WRedsStart =< 2*NWaiting} of
- {true, true} ->
- throw(done);
- {false, true} ->
- {NWaiting+1, WRedsStart};
- _ ->
- {0, Reds}
- end;
- _ ->
- {-1, 0}
- end,
+wait_for_idle_literal_collector(Pid, Alias, Start, Timeout) ->
+ Ref = make_ref(),
+ Pid ! {get_status, Ref, Alias},
Now = erlang:monotonic_time(millisecond),
- if Now - Start > Timeout ->
- error({busy_literal_area_collecor_timout, Timeout});
- true ->
- ok
- end,
- receive after 1 -> ok end,
- wait_for_idle_literal_collector(Pid, Start, Timeout, W, R).
+ TMO = case Start + Timeout - Now of
+ TimeLeft when TimeLeft < 0 -> 0;
+ TimeLeft -> TimeLeft
+ end,
+ receive
+ {Ref, idle} ->
+ unalias(Alias),
+ ok;
+ {Ref, _} ->
+ receive after 10 -> ok end,
+ wait_for_idle_literal_collector(Pid, Alias, Start, Timeout)
+ after TMO ->
+ unalias(Alias),
+ receive {Ref, _} -> ok after 0 -> ok end,
+ error({busy_literal_area_collecor_timout, Timeout})
+ end.
find_literal_area_collector() ->
case get('__literal_area_collector__') of
diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl
index 4aa9cf6b9f..2f28b4a0e9 100644
--- a/erts/emulator/test/nif_SUITE.erl
+++ b/erts/emulator/test/nif_SUITE.erl
@@ -1152,6 +1152,7 @@ monitor_process_c(Config) ->
Pid = spawn_link(fun() ->
R_ptr = alloc_monitor_resource_nif(),
{0,Mon} = monitor_process_nif(R_ptr, self(), true, Papa),
+ receive after 1000 -> ok end,
[R_ptr] = monitored_by(self()),
put(store, make_resource(R_ptr)),
ok = release_resource(R_ptr),
@@ -1159,8 +1160,8 @@ monitor_process_c(Config) ->
Papa ! {self(), done, R_ptr, Mon},
exit
end),
- [{Pid, done, R_ptr, Mon1},
- {monitor_resource_down, R_ptr, Pid, Mon2}] = flush(2),
+ receive {Pid, done, R_ptr, Mon1} -> ok end,
+ [{monitor_resource_down, R_ptr, Pid, Mon2}] = flush(1),
compare_monitors_nif(Mon1, Mon2),
{R_ptr, _, 1} = last_resource_dtor_call(),
ok.
diff --git a/erts/emulator/test/port_SUITE.erl b/erts/emulator/test/port_SUITE.erl
index 4a3ecef397..1fa6922648 100644
--- a/erts/emulator/test/port_SUITE.erl
+++ b/erts/emulator/test/port_SUITE.erl
@@ -1915,6 +1915,7 @@ otp_5112(Config) when is_list(Config) ->
true = lists:member(Port, Links1),
Port ! {self(), {command, ""}},
wait_until(fun () -> lists:member(Port, erlang:ports()) == false end),
+ receive after 1000 -> ok end, %% Give signal some time to propagate...
{links, Links2} = process_info(self(),links),
io:format("Links2: ~p~n",[Links2]),
false = lists:member(Port, Links2), %% This used to fail
diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl
index 0e549d424c..23a208adb2 100644
--- a/erts/emulator/test/process_SUITE.erl
+++ b/erts/emulator/test/process_SUITE.erl
@@ -49,6 +49,10 @@
process_info_smoke_all/1,
process_info_status_handled_signal/1,
process_info_reductions/1,
+ process_info_self_signal/1,
+ process_info_self_msgq_len/1,
+ process_info_self_msgq_len_messages/1,
+ process_info_self_msgq_len_more/1,
bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1,
otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1,
garbage_collect/1, otp_6237/1,
@@ -107,21 +111,8 @@ suite() ->
all() ->
[spawn_with_binaries, t_exit_1, {group, t_exit_2},
trap_exit_badarg, trap_exit_badarg_in_bif,
- t_process_info, process_info_other, process_info_other_msg,
- process_info_other_dist_msg, process_info_other_status,
- process_info_2_list,
- process_info_lock_reschedule,
- process_info_lock_reschedule2,
- process_info_lock_reschedule3,
- process_info_other_message_queue_len_signal_race,
- process_info_garbage_collection,
- process_info_parent,
- process_info_smoke_all,
- process_info_status_handled_signal,
- process_info_reductions,
bump_reductions, low_prio, yield, yield2, otp_4725,
- dist_unlink_ack_exit_leak,
- bad_register, garbage_collect, process_info_messages,
+ dist_unlink_ack_exit_leak, bad_register, garbage_collect,
process_flag_badarg,
process_flag_fullsweep_after, process_flag_heap_size,
command_line_max_heap_size,
@@ -129,6 +120,7 @@ all() ->
spawn_huge_arglist,
otp_6237,
{group, spawn_request},
+ {group, process_info_bif},
{group, processes_bif},
{group, otp_7738}, garb_other_running,
{group, system_task},
@@ -159,6 +151,24 @@ groups() ->
processes_small_tab, processes_this_tab,
processes_last_call_trap, processes_apply_trap,
processes_gc_trap, processes_term_proc_list]},
+ {process_info_bif, [],
+ [t_process_info, process_info_messages,
+ process_info_other, process_info_other_msg,
+ process_info_other_message_queue_len_signal_race,
+ process_info_other_dist_msg, process_info_other_status,
+ process_info_2_list,
+ process_info_lock_reschedule,
+ process_info_lock_reschedule2,
+ process_info_lock_reschedule3,
+ process_info_garbage_collection,
+ process_info_parent,
+ process_info_smoke_all,
+ process_info_status_handled_signal,
+ process_info_reductions,
+ process_info_self_signal,
+ process_info_self_msgq_len,
+ process_info_self_msgq_len_messages,
+ process_info_self_msgq_len_more]},
{otp_7738, [],
[otp_7738_waiting, otp_7738_suspended,
otp_7738_resume]},
@@ -1351,6 +1361,146 @@ pi_reductions_main_unlocker_loop(Other) ->
erlang:yield(),
pi_reductions_main_unlocker_loop(Other).
+process_info_self_signal(Config) when is_list(Config) ->
+ %% Test that signals that we have sent to ourselves are
+ %% visible in process_info() result. This is not strictly
+ %% a necessary property, but implemented so now. See
+ %% process_info.c:process_info_bif() for more info.
+ Self = self(),
+ Ref = make_ref(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_signal_spammer(Self)
+ end,
+ fun () ->
+ self() ! Ref,
+ process_info(self(), messages)
+ end,
+ fun (Res) ->
+ {messages, [Ref]} = Res
+ end).
+
+process_info_self_signal_spammer(To) ->
+ erlang:demonitor(erlang:monitor(process, To)),
+ process_info_self_signal_spammer(To).
+
+process_info_self_msgq_len(Config) when is_list(Config) ->
+ %% Spam ourselves with signals forcing us to flush own
+ %% signal queue..
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun () ->
+ process_info(self(), message_queue_len)
+ end,
+ fun (Res) ->
+ {message_queue_len, Len} = Res,
+ true = Len > 0,
+ ok
+ end).
+
+
+process_info_self_msgq_len_messages(Config) when is_list(Config) ->
+ %% Spam ourselves with signals normally forcing us to flush own
+ %% signal queue, but since we also want messages wont be flushed...
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self, 100000)
+ end,
+ fun () ->
+ process_info(self(),
+ [message_queue_len,
+ messages])
+ end,
+ fun (Res) ->
+ [{message_queue_len, Len},
+ {messages, Msgs}] = Res,
+ Len = length(Msgs),
+ ok
+ end).
+
+process_info_self_msgq_len_more(Config) when is_list(Config) ->
+ self() ! hej,
+ BodyRes = process_info_self_msgq_len_more_caller_body(),
+ ok = process_info_self_msgq_len_more_caller_body_result(BodyRes),
+ TailRes = process_info_self_msgq_len_more_caller_tail(),
+ ok = process_info_self_msgq_len_more_caller_tail_result(TailRes),
+ receive hej -> ok end,
+ %% Check that current_function, current_location, and
+ %% current_stacktrace give sane results flushing or not...
+ Self = self(),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun process_info_self_msgq_len_more_caller_body/0,
+ fun process_info_self_msgq_len_more_caller_body_result/1),
+ pi_sig_spam_test(fun () ->
+ process_info_self_msgq_len_spammer(Self)
+ end,
+ fun process_info_self_msgq_len_more_caller_tail/0,
+ fun process_info_self_msgq_len_more_caller_tail_result/1).
+
+process_info_self_msgq_len_more_caller_body() ->
+ Res = process_info(self(),
+ [message_queue_len,
+ current_function,
+ current_location,
+ current_stacktrace]),
+ id(Res).
+
+process_info_self_msgq_len_more_caller_body_result(Res) ->
+ [{message_queue_len, Len},
+ {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_body,0}},
+ {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_body,0,_}},
+ {current_stacktrace,
+ [{process_SUITE,process_info_self_msgq_len_more_caller_body,0,_} | _]}] = Res,
+ true = Len > 0,
+ ok.
+
+process_info_self_msgq_len_more_caller_tail() ->
+ process_info(self(),
+ [message_queue_len,
+ current_function,
+ current_location,
+ current_stacktrace]).
+
+process_info_self_msgq_len_more_caller_tail_result(Res) ->
+ [{message_queue_len, Len},
+ {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0}},
+ {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_}},
+ {current_stacktrace,
+ [{process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_} | _]}] = Res,
+ true = Len > 0,
+ ok.
+
+
+process_info_self_msgq_len_spammer(To) ->
+ process_info_self_msgq_len_spammer(To, 10000000).
+
+process_info_self_msgq_len_spammer(_To, 0) ->
+ ok;
+process_info_self_msgq_len_spammer(To, N) ->
+ To ! hejhopp,
+ erlang:demonitor(erlang:monitor(process, To)),
+ process_info_self_msgq_len_spammer(To, N-1).
+
+pi_sig_spam_test(SpamFun, PITest, PICheckRes) ->
+ SO = erlang:system_flag(schedulers_online, 1),
+ try
+ Self = self(),
+ SigSpammer = spawn_link(SpamFun),
+ process_flag(priority, low),
+ receive after 10 -> ok end,
+ Res = PITest(),
+ process_flag(priority, high),
+ unlink(SigSpammer),
+ exit(SigSpammer, kill),
+ false = is_process_alive(SigSpammer),
+ PICheckRes(Res)
+ after
+ _ = erlang:system_flag(schedulers_online, SO)
+ end.
+
%% Tests erlang:bump_reductions/1.
bump_reductions(Config) when is_list(Config) ->
diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl
index e1fcd3fbd4..ec2cbfb80e 100644
--- a/erts/emulator/test/signal_SUITE.erl
+++ b/erts/emulator/test/signal_SUITE.erl
@@ -32,6 +32,7 @@
-include_lib("common_test/include/ct.hrl").
-export([all/0, suite/0,init_per_suite/1, end_per_suite/1]).
-export([init_per_testcase/2, end_per_testcase/2]).
+-export([groups/0, init_per_group/2, end_per_group/2]).
% Test cases
-export([xm_sig_order/1,
@@ -46,7 +47,19 @@
monitor_order/1,
monitor_named_order_local/1,
monitor_named_order_remote/1,
- monitor_nodes_order/1]).
+ monitor_nodes_order/1,
+ move_msgs_off_heap_signal_basic/1,
+ move_msgs_off_heap_signal_recv/1,
+ move_msgs_off_heap_signal_exit/1,
+ move_msgs_off_heap_signal_recv_exit/1,
+ copy_literal_area_signal_basic/1,
+ copy_literal_area_signal_recv/1,
+ copy_literal_area_signal_exit/1,
+ copy_literal_area_signal_recv_exit/1,
+ simultaneous_signals_basic/1,
+ simultaneous_signals_recv/1,
+ simultaneous_signals_exit/1,
+ simultaneous_signals_recv_exit/1]).
-export([spawn_spammers/3]).
@@ -79,7 +92,29 @@ all() ->
monitor_order,
monitor_named_order_local,
monitor_named_order_remote,
- monitor_nodes_order].
+ monitor_nodes_order,
+ {group, adjust_message_queue}].
+
+groups() ->
+ [{adjust_message_queue, [],
+ [move_msgs_off_heap_signal_basic,
+ move_msgs_off_heap_signal_recv,
+ move_msgs_off_heap_signal_exit,
+ move_msgs_off_heap_signal_recv_exit,
+ copy_literal_area_signal_basic,
+ copy_literal_area_signal_recv,
+ copy_literal_area_signal_exit,
+ copy_literal_area_signal_recv_exit,
+ simultaneous_signals_basic,
+ simultaneous_signals_recv,
+ simultaneous_signals_exit,
+ simultaneous_signals_recv_exit]}].
+
+init_per_group(_GroupName, Config) ->
+ Config.
+
+end_per_group(_GroupName, Config) ->
+ Config.
%% Test that exit signals and messages are received in correct order
xm_sig_order(Config) when is_list(Config) ->
@@ -706,6 +741,281 @@ receive_filter_spam() ->
M -> M
end.
+move_msgs_off_heap_signal_basic(Config) when is_list(Config) ->
+ move_msgs_off_heap_signal_test(false, false).
+
+move_msgs_off_heap_signal_recv(Config) when is_list(Config) ->
+ move_msgs_off_heap_signal_test(true, false).
+
+move_msgs_off_heap_signal_exit(Config) when is_list(Config) ->
+ move_msgs_off_heap_signal_test(false, true).
+
+move_msgs_off_heap_signal_recv_exit(Config) when is_list(Config) ->
+ move_msgs_off_heap_signal_test(true, true).
+
+move_msgs_off_heap_signal_test(RecvPair, Exit) ->
+ erlang:trace(new_processes, true, [running_procs]),
+ SFact = test_server:timetrap_scale_factor(),
+ GoTime = erlang:monotonic_time(millisecond) + 1000*SFact,
+ ProcF = fun () ->
+ Now = erlang:monotonic_time(millisecond),
+ Tmo = case GoTime - Now of
+ Left when Left < 0 ->
+ erlang:display({go_time_passed, Left}),
+ 0;
+ Left ->
+ Left
+ end,
+ receive after Tmo -> ok end,
+ on_heap = process_flag(message_queue_data, off_heap),
+ if RecvPair -> receive_integer_pairs(infinity);
+ true -> receive after infinity -> ok end
+ end
+ end,
+ Ps = lists:map(fun (_) ->
+ spawn_opt(ProcF,
+ [link,
+ {message_queue_data, on_heap}])
+ end, lists:seq(1, 100)),
+ lists:foreach(fun (P) ->
+ lists:foreach(fun (N) when N rem 100 == 0 ->
+ P ! [N|N];
+ (N) ->
+ P ! N
+ end, lists:seq(1, 10000))
+ end, Ps),
+ Now = erlang:monotonic_time(millisecond),
+ Tmo = case GoTime - Now + 10 of
+ Left when Left < 0 ->
+ erlang:display({go_time_passed, Left}),
+ 0;
+ Left ->
+ Left
+ end,
+ receive after Tmo -> ok end,
+ if Exit ->
+ _ = lists:foldl(fun (P, N) when N rem 10 ->
+ unlink(P),
+ exit(P, terminated),
+ N+1;
+ (_P, N) ->
+ N+1
+ end,
+ 0,
+ Ps),
+ ok;
+ true ->
+ ok
+ end,
+ wait_traced_not_running(1000 + 200*SFact),
+ erlang:trace(new_processes, false, [running_procs]),
+ lists:foreach(fun (P) ->
+ unlink(P),
+ exit(P, kill)
+ end, Ps),
+ lists:foreach(fun (P) ->
+ false = is_process_alive(P)
+ end, Ps),
+ ok.
+
+copy_literal_area_signal_basic(Config) when is_list(Config) ->
+ copy_literal_area_signal_test(false, false).
+
+copy_literal_area_signal_recv(Config) when is_list(Config) ->
+ copy_literal_area_signal_test(true, false).
+
+copy_literal_area_signal_exit(Config) when is_list(Config) ->
+ copy_literal_area_signal_test(false, true).
+
+copy_literal_area_signal_recv_exit(Config) when is_list(Config) ->
+ copy_literal_area_signal_test(true, true).
+
+copy_literal_area_signal_test(RecvPair, Exit) ->
+ persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()),
+ Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}),
+ true = is_reference(Literal),
+ 0 = erts_debug:size_shared(Literal), %% Should be a literal...
+ ProcF = fun () ->
+ 0 = erts_debug:size_shared(Literal), %% Should be a literal...
+ if RecvPair ->
+ receive receive_pairs -> ok end,
+ receive_integer_pairs(0);
+ true ->
+ ok
+ end,
+ receive check_literal_conversion -> ok end,
+ receive
+ Literal ->
+ %% Should not be a literal anymore...
+ false = (0 == erts_debug:size_shared(Literal))
+ end
+ end,
+ PMs = lists:map(fun (_) ->
+ spawn_opt(ProcF, [link, monitor])
+ end, lists:seq(1, 100)),
+ lists:foreach(fun ({P,_M}) ->
+ lists:foreach(fun (N) when N rem 100 == 0 ->
+ P ! [N|N];
+ (N) ->
+ P ! N
+ end, lists:seq(1, 10000)),
+ P ! Literal
+ end, PMs),
+ persistent_term:erase({?MODULE, ?FUNCTION_NAME}),
+ receive after 1 -> ok end,
+ if RecvPair ->
+ lists:foreach(fun ({P,_M}) ->
+ P ! receive_pairs
+ end, PMs);
+ true ->
+ ok
+ end,
+ if Exit ->
+ _ = lists:foldl(fun ({P, _M}, N) when N rem 10 ->
+ unlink(P),
+ exit(P, terminated),
+ N+1;
+ (_PM, N) ->
+ N+1
+ end,
+ 0,
+ PMs),
+ ok;
+ true ->
+ ok
+ end,
+ literal_area_collector_test:check_idle(),
+ lists:foreach(fun ({P,_M}) ->
+ P ! check_literal_conversion
+ end, PMs),
+ lists:foreach(fun ({P, M}) ->
+ receive
+ {'DOWN', M, process, P, R} ->
+ case R of
+ normal -> ok;
+ terminated -> ok
+ end
+ end
+ end, PMs),
+ ok.
+
+simultaneous_signals_basic(Config) when is_list(Config) ->
+ simultaneous_signals_test(false, false).
+
+simultaneous_signals_recv(Config) when is_list(Config) ->
+ simultaneous_signals_test(true, false).
+
+simultaneous_signals_exit(Config) when is_list(Config) ->
+ simultaneous_signals_test(false, true).
+
+simultaneous_signals_recv_exit(Config) when is_list(Config) ->
+ simultaneous_signals_test(true, true).
+
+simultaneous_signals_test(RecvPairs, Exit) ->
+ erlang:trace(new_processes, true, [running_procs]),
+ persistent_term:put({?MODULE, ?FUNCTION_NAME}, make_ref()),
+ Literal = persistent_term:get({?MODULE, ?FUNCTION_NAME}),
+ true = is_reference(Literal),
+ 0 = erts_debug:size_shared(Literal), %% Should be a literal...
+ SFact = test_server:timetrap_scale_factor(),
+ GoTime = erlang:monotonic_time(millisecond) + 1000*SFact,
+ ProcF = fun () ->
+ 0 = erts_debug:size_shared(Literal), %% Should be a literal...
+ Now = erlang:monotonic_time(millisecond),
+ Tmo = case GoTime - Now of
+ Left when Left < 0 ->
+ erlang:display({go_time_passed, Left}),
+ 0;
+ Left ->
+ Left
+ end,
+ receive after Tmo -> ok end,
+ on_heap = process_flag(message_queue_data, off_heap),
+ if RecvPairs -> receive_integer_pairs(0);
+ true -> ok
+ end,
+ receive check_literal_conversion -> ok end,
+ receive
+ Literal ->
+ %% Should not be a literal anymore...
+ false = (0 == erts_debug:size_shared(Literal))
+ end
+ end,
+ PMs = lists:map(fun (_) ->
+ spawn_opt(ProcF,
+ [link,
+ monitor,
+ {message_queue_data, on_heap}])
+ end, lists:seq(1, 100)),
+ lists:foreach(fun ({P,_M}) ->
+ lists:foreach(fun (N) when N rem 100 == 0 ->
+ P ! [N|N];
+ (N) ->
+ P ! N
+ end, lists:seq(1, 10000)),
+ P ! Literal
+ end, PMs),
+ Now = erlang:monotonic_time(millisecond),
+ Tmo = case GoTime - Now - 5 of % a bit earlier...
+ Left when Left < 0 ->
+ erlang:display({go_time_passed, Left}),
+ 0;
+ Left ->
+ Left
+ end,
+ receive after Tmo -> ok end,
+ persistent_term:erase({?MODULE, ?FUNCTION_NAME}),
+ receive after 10 -> ok end,
+ if Exit ->
+ _ = lists:foldl(fun ({P, _M}, N) when N rem 10 ->
+ unlink(P),
+ exit(P, terminated),
+ N+1;
+ (_PM, N) ->
+ N+1
+ end,
+ 0,
+ PMs),
+ ok;
+ true ->
+ ok
+ end,
+ wait_traced_not_running(1000 + 200*SFact),
+ erlang:trace(new_processes, false, [running_procs]),
+ literal_area_collector_test:check_idle(),
+ lists:foreach(fun ({P,_M}) ->
+ P ! check_literal_conversion
+ end, PMs),
+ lists:foreach(fun ({P, M}) ->
+ receive
+ {'DOWN', M, process, P, R} ->
+ case R of
+ normal -> ok;
+ terminated -> ok
+ end
+ end
+ end, PMs),
+ ok.
+
+
+wait_traced_not_running(Tmo) ->
+ receive
+ {trace,_,What,_} when What == in;
+ What == out ->
+ wait_traced_not_running(Tmo)
+ after
+ Tmo ->
+ ok
+ end.
+
+receive_integer_pairs(Tmo) ->
+ receive
+ [N|N] ->
+ receive_integer_pairs(Tmo)
+ after
+ Tmo ->
+ ok
+ end.
%%
%% -- Internal utils --------------------------------------------------------
diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in
index fa215618aa..64d8de67df 100644
--- a/erts/etc/unix/etp-commands.in
+++ b/erts/etc/unix/etp-commands.in
@@ -2479,7 +2479,7 @@ define etp-proc-state-int
printf "active | "
end
if ($arg0 & 0x1000)
- printf "unused | "
+ printf "maybe-self-sigs | "
end
if ($arg0 & 0x800)
printf "exiting | "
diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam
index d54baaf4a2..165bfa4622 100644
--- a/erts/preloaded/ebin/erts_literal_area_collector.beam
+++ b/erts/preloaded/ebin/erts_literal_area_collector.beam
Binary files differ
diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl
index 8a73ed1685..bb2ad6b919 100644
--- a/erts/preloaded/src/erts_literal_area_collector.erl
+++ b/erts/preloaded/src/erts_literal_area_collector.erl
@@ -62,39 +62,46 @@ msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) ->
switch_area();
%% Process (_Pid) has completed the request...
- {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1,
- NeedIReq == [] ->
+ {copy_literals, {Area, _ReqType, _Pid}, ok} when Ongoing == 1,
+ NeedIReq == [] ->
switch_area(); %% Last process completed...
- {copy_literals, {Area, false, _Pid}, ok} ->
+ {copy_literals, {Area, init, _Pid}, ok} ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd, NeedGC);
- {copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] ->
+ {copy_literals, {Area, ReqType, _Pid}, ok} when NeedGC == [],
+ ReqType /= init ->
msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
GcOutstnd-1, []);
- {copy_literals, {Area, true, _Pid}, ok} ->
- send_copy_req(hd(NeedGC), Area, true),
- msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC));
+ {copy_literals, {Area, ReqType, _Pid}, ok} when ReqType /= init ->
+ [{GCPid,GCWork} | NewNeedGC] = NeedGC,
+ send_copy_req(GCPid, Area, GCWork),
+ msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, NewNeedGC);
%% Process (Pid) failed to complete the request
%% since it needs to garbage collect in order to
%% complete the request...
- {copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND ->
- send_copy_req(Pid, Area, true),
+ {copy_literals, {Area, init, Pid}, GCWork} when GcOutstnd
+ < ?MAX_GC_OUTSTND ->
+ send_copy_req(Pid, Area, GCWork),
msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC);
- {copy_literals, {Area, false, Pid}, need_gc} ->
+ {copy_literals, {Area, init, Pid}, GCWork} ->
msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq),
- GcOutstnd, [Pid|NeedGC]);
+ GcOutstnd, [{Pid,GCWork} | NeedGC]);
%% Not handled message regarding the area that we
%% currently are working with. Crash the VM so
%% we notice this bug...
- {copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) ->
+ {copy_literals, {Area, _, _}, _} = Msg ->
exit({not_handled_message, Msg});
{change_prio, From, Ref, Prio} ->
change_prio(From, Ref, Prio),
msg_loop(Area, OReqInfo, GcOutstnd, NeedGC);
+ {get_status, Ref, From} when is_pid(From); is_reference(From) ->
+ From ! {Ref, if Ongoing == 0 -> idle; true -> working end},
+ msg_loop(Area, OReqInfo, GcOutstnd, NeedGC);
+
%% Unexpected garbage message. Get rid of it...
_Ignore ->
msg_loop(Area, OReqInfo, GcOutstnd, NeedGC)
@@ -126,7 +133,7 @@ switch_area() ->
check_send_copy_req(_Area, Ongoing, []) ->
{Ongoing, []};
check_send_copy_req(Area, Ongoing, [Pid|Pids]) ->
- send_copy_req(Pid, Area, false),
+ send_copy_req(Pid, Area, init),
{Ongoing+1, Pids}.
send_copy_reqs(Ps, Area, OReqLim) ->
@@ -137,23 +144,23 @@ send_copy_reqs([], _Area, _OReqLim, N) ->
send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim ->
{N, Ps};
send_copy_reqs([P|Ps], Area, OReqLim, N) ->
- send_copy_req(P, Area, false),
+ send_copy_req(P, Area, init),
send_copy_reqs(Ps, Area, OReqLim, N+1).
-send_copy_req(P, Area, GC) ->
- erts_literal_area_collector:send_copy_request(P, Area, GC).
+send_copy_req(P, Area, How) ->
+ erts_literal_area_collector:send_copy_request(P, Area, How).
-spec release_area_switch() -> boolean().
release_area_switch() ->
erlang:nif_error(undef). %% Implemented in beam_bif_load.c
--spec send_copy_request(To, AreaId, GcAllowed) -> 'ok' when
+-spec send_copy_request(To, AreaId, How) -> 'ok' when
To :: pid(),
AreaId :: term(),
- GcAllowed :: boolean().
+ How :: 'init' | 'check_gc' | 'need_gc'.
-send_copy_request(_To, _AreaId, _GcAllowed) ->
+send_copy_request(_To, _AreaId, _How) ->
erlang:nif_error(undef). %% Implemented in beam_bif_load.c
change_prio(From, Ref, Prio) ->