From 49681bd7458649b34567bdad6918bf579846d80e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20H=C3=B6gberg?= Date: Wed, 23 Feb 2022 15:00:22 +0100 Subject: erts: Fix parallel signal optimization When the parallel signal optimization was enabled and signals were sent without a known sender (e.g. a monitor-by-name firing), they always landed in signal slot 0, which was rarely the same slot as messages sent from the same process, breaking our signal order guarantees. This commit fixes that by always keeping track of the sender regardless of how and where signal are sent. --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/bif.c | 35 ++- erts/emulator/beam/break.c | 15 +- erts/emulator/beam/dist.c | 93 ++++--- erts/emulator/beam/erl_message.c | 86 ++++--- erts/emulator/beam/erl_nif.c | 8 +- erts/emulator/beam/erl_proc_sig_queue.c | 425 +++++++++++++++++++------------- erts/emulator/beam/erl_proc_sig_queue.h | 195 +++++++++------ erts/emulator/beam/erl_process.c | 20 +- erts/emulator/beam/external.c | 3 +- erts/emulator/beam/io.c | 119 +++++---- erts/emulator/test/signal_SUITE.erl | 114 ++++++--- 12 files changed, 685 insertions(+), 429 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index c60a51673b..9ac4e2abfe 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -659,6 +659,7 @@ atom spawn_init atom spawn_reply atom spawn_request atom spawn_request_yield +atom spawn_service atom spawned atom ssl_tls atom stack_size diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index b9b3a93ee3..7069a5ead2 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -129,8 +129,10 @@ BIF_RETTYPE link_1(BIF_ALIST_1) rlnk = erts_link_internal_create(ERTS_LNK_TYPE_PROC, BIF_P->common.id); - if (erts_proc_sig_send_link(BIF_P, BIF_ARG_1, rlnk)) + if (erts_proc_sig_send_link(&BIF_P->common, BIF_P->common.id, + BIF_ARG_1, rlnk)) { BIF_RET(am_true); + } erts_link_tree_delete(&ERTS_P_LINKS(BIF_P), lnk); erts_link_internal_release(lnk); @@ -251,8 +253,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) } erts_link_set_dead_dist(&elnk->ld.dist, dep->sysname); } - erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &elnk->ld.dist, - am_noconnection, NIL); + erts_proc_sig_send_link_exit_noconnection(&elnk->ld.dist); break; case ERTS_DSIG_PREP_PENDING: @@ -395,7 +396,7 @@ demonitor(Process *c_p, Eterm ref, Eterm *multip) } case ERTS_MON_TYPE_PROC: - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&c_p->common, c_p->common.id, 0, mon); return am_true; case ERTS_MON_TYPE_DIST_PROC: { @@ -677,9 +678,12 @@ static BIF_RETTYPE monitor(Process *c_p, Eterm type, Eterm target, mdp->origin.flags |= add_oflags; erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), &mdp->origin); - if (!erts_proc_sig_send_monitor(&mdp->u.target, id)) - erts_proc_sig_send_monitor_down(&mdp->u.target, + if (!erts_proc_sig_send_monitor(&c_p->common, c_p->common.id, + &mdp->u.target, id)) { + erts_proc_sig_send_monitor_down(NULL, id, + &mdp->u.target, am_noproc); + } } goto done; @@ -722,7 +726,9 @@ static BIF_RETTYPE monitor(Process *c_p, Eterm type, Eterm target, case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: erts_monitor_set_dead_dist(&mdp->u.target, dep->sysname); - erts_proc_sig_send_monitor_down(&mdp->u.target, am_noconnection); + erts_proc_sig_send_monitor_down(NULL, id, + &mdp->u.target, + am_noconnection); code = ERTS_DSIG_SEND_OK; break; @@ -786,8 +792,10 @@ static BIF_RETTYPE monitor(Process *c_p, Eterm type, Eterm target, mdp->origin.flags |= add_oflags; erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), &mdp->origin); prt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt || erts_port_monitor(c_p, prt, &mdp->u.target) == ERTS_PORT_OP_DROPPED) - erts_proc_sig_send_monitor_down(&mdp->u.target, am_noproc); + if (!prt || erts_port_monitor(c_p, prt, &mdp->u.target) == ERTS_PORT_OP_DROPPED) { + erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, id, + &mdp->u.target, am_noproc); + } goto done; } @@ -1102,7 +1110,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(BIF_P), BIF_ARG_1); if (ilnk && !ilnk->unlinking) { - Uint64 id = erts_proc_sig_send_unlink(BIF_P, + Uint64 id = erts_proc_sig_send_unlink(&BIF_P->common, BIF_P->common.id, &ilnk->link); if (id) @@ -1135,7 +1143,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) else { ErtsSigUnlinkOp *sulnk; - sulnk = erts_proc_sig_make_unlink_op(BIF_P, BIF_P->common.id); + sulnk = erts_proc_sig_make_unlink_op(&BIF_P->common, + BIF_P->common.id); ilnk->unlinking = sulnk->id; #ifdef DEBUG ref = NIL; @@ -1173,7 +1182,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) if (elnk->unlinking) BIF_RET(am_true); - unlink_id = erts_proc_sig_new_unlink_id(BIF_P); + unlink_id = erts_proc_sig_new_unlink_id(&BIF_P->common); elnk->unlinking = unlink_id; code = erts_dsig_prepare(&ctx, dep, BIF_P, ERTS_PROC_LOCK_MAIN, @@ -1547,7 +1556,7 @@ static BIF_RETTYPE send_exit_signal_bif(Process *c_p, Eterm id, Eterm reason, in && c_p->common.id == id && (reason == am_kill || !(c_p->flags & F_TRAP_EXIT))); - erts_proc_sig_send_exit(c_p, c_p->common.id, id, + erts_proc_sig_send_exit(&c_p->common, c_p->common.id, id, reason, NIL, exit2_suicide); if (!exit2_suicide) ERTS_BIF_PREP_RET(ret_val, am_true); diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index e1071506f5..e3a43e14f0 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -134,12 +134,19 @@ process_killer(void) if ((j = sys_get_key(0)) <= 0) erts_exit(0, ""); switch(j) { - case 'k': + case 'k': + { + Process *init_proc; + ASSERT(erts_init_process_id != ERTS_INVALID_PID); + init_proc = erts_proc_lookup_raw(erts_init_process_id); + /* Send a 'kill' exit signal from init process */ - erts_proc_sig_send_exit(NULL, erts_init_process_id, - rp->common.id, am_kill, NIL, - 0); + erts_proc_sig_send_exit(&init_proc->common, + erts_init_process_id, + rp->common.id, + am_kill, NIL, 0); + } case 'n': br = 1; break; case 'r': return; default: return; diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index cab8c4b9eb..af2b03c65e 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -267,10 +267,23 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) static int monitor_connection_down(ErtsMonitor *mon, void *unused, Sint reds) { - if (erts_monitor_is_origin(mon)) - erts_proc_sig_send_demonitor(mon); - else - erts_proc_sig_send_monitor_down(mon, am_noconnection); + const ErtsMonitorData *data = erts_monitor_to_data(mon); + Eterm from; + + if (erts_monitor_is_origin(mon)) { + from = data->u.target.other.item; + } else { + from = data->origin.other.item; + } + + ASSERT(!is_internal_pid(from) && is_internal_pid(mon->other.item)); + + if (erts_monitor_is_origin(mon)) { + erts_proc_sig_send_demonitor(NULL, from, 0, mon); + } else { + erts_proc_sig_send_monitor_down(NULL, from, mon, am_noconnection); + } + return ERTS_MON_LNK_FIRE_REDS; } @@ -282,8 +295,7 @@ static int dist_pend_spawn_exit_connection_down(ErtsMonitor *mon, void *unused, static int link_connection_down(ErtsLink *lnk, void *vdist, Sint reds) { - erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk, - am_noconnection, NIL); + erts_proc_sig_send_link_exit_noconnection(lnk); return ERTS_MON_LNK_FIRE_REDS; } @@ -828,8 +840,10 @@ inc_no_nodes(void) static void kill_dist_ctrl_proc(void *vpid) { - erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid, - am_kill, NIL, 0); + /* Send a 'kill' exit signal from init process */ + Process *init_proc = erts_proc_lookup_raw(erts_init_process_id); + erts_proc_sig_send_exit(&init_proc->common, erts_init_process_id, + (Eterm)vpid, am_kill, NIL, 0); } static void @@ -1315,7 +1329,7 @@ erts_dsig_send_unlink(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Uint6 * end up in an inconsistent state as we could before the new link * protocol was introduced... */ - erts_proc_sig_send_dist_unlink_ack(ctx->c_p, ctx->dep, ctx->connection_id, + erts_proc_sig_send_dist_unlink_ack(ctx->dep, ctx->connection_id, remote, local, id); ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_UNLINK), local, remote); } @@ -2077,11 +2091,10 @@ int erts_net_message(Port *prt, ASSERT(eq(ldp->proc.other.item, from)); code = erts_link_dist_insert(&ldp->dist, ede.mld); - if (erts_proc_sig_send_link(NULL, to, &ldp->proc)) { + if (erts_proc_sig_send_link(NULL, from, to, &ldp->proc)) { if (!code) { /* Race: connection already down => send link exit */ - erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &ldp->dist, - am_noconnection, NIL); + erts_proc_sig_send_link_exit_noconnection(&ldp->dist); } break; /* Done */ } @@ -2164,8 +2177,7 @@ int erts_net_message(Port *prt, if (is_not_internal_pid(to)) goto invalid_message; - erts_proc_sig_send_dist_unlink_ack(NULL, dep, conn_id, - from, to, id); + erts_proc_sig_send_dist_unlink_ack(dep, conn_id, from, to, id); break; } @@ -2221,8 +2233,10 @@ int erts_net_message(Port *prt, break; } - if (erts_proc_sig_send_monitor(&mdp->u.target, pid)) + if (erts_proc_sig_send_monitor(NULL, watcher, + &mdp->u.target, pid)) { break; /* done */ + } /* Failed to send to local proc; cleanup reply noproc... */ @@ -2243,8 +2257,7 @@ int erts_net_message(Port *prt, case DOP_DEMONITOR_P: /* A remote node informs us that a local pid in no longer monitored - We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref}, - We need only the ref of course */ + We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref}. */ if (tuple_arity != 4) { goto invalid_message; @@ -2261,9 +2274,9 @@ int erts_net_message(Port *prt, if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep) goto invalid_message; - if (is_internal_pid(watched)) - erts_proc_sig_send_dist_demonitor(watched, ref); - else if (is_external_pid(watched) + if (is_internal_pid(watched)) { + erts_proc_sig_send_dist_demonitor(watcher, watched, ref); + } else if (is_external_pid(watched) && external_pid_dist_entry(watched) == erts_this_dist_entry) { /* old incarnation; ignore it */ ; @@ -2276,13 +2289,14 @@ int erts_net_message(Port *prt, mon = erts_monitor_tree_lookup(ede.mld->orig_name_monitors, ref); if (mon) erts_monitor_tree_delete(&ede.mld->orig_name_monitors, mon); - } - else + } else { mon = NULL; + } erts_mtx_unlock(&ede.mld->mtx); - if (mon) - erts_proc_sig_send_demonitor(mon); + if (mon) { + erts_proc_sig_send_demonitor(NULL, watcher, 0, mon); + } } else goto invalid_message; @@ -2367,13 +2381,14 @@ int erts_net_message(Port *prt, #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(edep, "MSG", buf, orig_len); #endif - to = tuple[3]; - if (is_not_pid(to)) - goto invalid_message; - rp = erts_proc_lookup(to); + from = tuple[2]; + to = tuple[3]; + if (is_not_pid(to)) + goto invalid_message; + rp = erts_proc_lookup(to); - if (rp) { - erts_queue_dist_message(rp, 0, edep, ede_hfrag, token, am_Empty); + if (rp) { + erts_queue_dist_message(rp, 0, edep, ede_hfrag, token, from); } else if (ede_hfrag != NULL) { erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag)); free_message_buffer(ede_hfrag); @@ -2398,12 +2413,12 @@ int erts_net_message(Port *prt, #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(edep, "ALIAS MSG", buf, orig_len); #endif - - to = tuple[3]; - if (is_not_ref(to)) - goto invalid_message; - - erts_proc_sig_send_dist_to_alias(to, edep, ede_hfrag, token); + from = tuple[2]; + to = tuple[3]; + if (is_not_ref(to)) { + goto invalid_message; + } + erts_proc_sig_send_dist_to_alias(from, to, edep, ede_hfrag, token); break; case DOP_PAYLOAD_MONITOR_P_EXIT: @@ -2841,10 +2856,8 @@ int erts_net_message(Port *prt, ref, dep->mld); } - } - else if (lnk && !link_inserted) { - erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &ldp->dist, - am_noconnection, NIL); + } else if (lnk && !link_inserted) { + erts_proc_sig_send_link_exit_noconnection(&ldp->dist); } break; diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 14b6d0a121..a472772547 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -278,14 +278,14 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s void erts_queue_dist_message(Process *rcvr, - ErtsProcLocks rcvr_locks, - ErtsDistExternal *dist_ext, + ErtsProcLocks rcvr_locks, + ErtsDistExternal *dist_ext, ErlHeapFragment *hfrag, - Eterm token, + Eterm token, Eterm from) { - ErtsMessage* mp; erts_aint_t state; + ErtsMessage *mp; ERTS_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); @@ -320,40 +320,62 @@ erts_queue_dist_message(Process *rcvr, #endif ERL_MESSAGE_TOKEN(mp) = token; - if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { - if (erts_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) { - ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; - ErtsProcLocks unlocks = - rcvr_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ); - if (unlocks) { - erts_proc_unlock(rcvr, unlocks); - need_locks |= unlocks; - } - erts_proc_lock(rcvr, need_locks); - } + /* If the sender is known, try to enqueue to an outer message queue buffer + * instead of directly to the outer message queue. + * + * Otherwise, the code below flushes the buffer before adding the message + * to ensure the signal order is maintained. This should only happen for + * the relatively uncommon DOP_SEND/DOP_SEND_TT operations. */ + if (is_external_pid(from) && + erts_proc_sig_queue_try_enqueue_to_buffer(from, rcvr, rcvr_locks, + mp, &mp->next, + NULL, 1, 0)) { + return; } + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + ErtsProcLocks unlocks; + + unlocks = rcvr_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ); + erts_proc_unlock(rcvr, unlocks); + + erts_proc_sig_queue_lock(rcvr); + } state = erts_atomic32_read_acqb(&rcvr->state); if (state & ERTS_PSFLG_EXITING) { - if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) - erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - /* Drop message if receiver is exiting or has a pending exit ... */ - erts_cleanup_messages(mp); - } - else { - LINK_MESSAGE(rcvr, mp); + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); + } + + /* Drop message if receiver is exiting or has a pending exit ... */ + erts_cleanup_messages(mp); + } else { + if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { + /* Install buffers for the outer message if the heuristic + * indicates that this is beneficial. It is best to do this as + * soon as possible to avoid as much contention as possible. */ + erts_proc_sig_queue_maybe_install_buffers(rcvr, state); - if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) - erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); + /* Flush outer signal queue buffers, if such buffers are + * installed, to ensure that messages from the same + * process cannot be reordered. */ + erts_proc_sig_queue_flush_buffers(rcvr); + } + + LINK_MESSAGE(rcvr, mp); + + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); + } - erts_proc_notify_new_message(rcvr, rcvr_locks); + erts_proc_notify_new_message(rcvr, rcvr_locks); } } /* Add messages last in message queue */ static void -queue_messages(Process* sender, /* is NULL if the sender is not a local process */ +queue_messages(Eterm from, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, @@ -382,7 +404,7 @@ queue_messages(Process* sender, /* is NULL if the sender is not a local process * Try to enqueue to an outer message queue buffer instead of * directly to the outer message queue */ - if (erts_proc_sig_queue_try_enqueue_to_buffer(sender, receiver, receiver_locks, + if (erts_proc_sig_queue_try_enqueue_to_buffer(from, receiver, receiver_locks, first, last, NULL, len, 0)) { return; } @@ -481,7 +503,7 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; ERL_MESSAGE_TOKEN(mp) = am_undefined; - queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1); + queue_messages(from, receiver, receiver_locks, mp, &mp->next, 1); } /** @@ -498,7 +520,7 @@ erts_queue_message_token(Process* receiver, ErtsProcLocks receiver_locks, ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; ERL_MESSAGE_TOKEN(mp) = token; - queue_messages(NULL, receiver, receiver_locks, mp, &mp->next, 1); + queue_messages(from, receiver, receiver_locks, mp, &mp->next, 1); } @@ -519,7 +541,7 @@ erts_queue_proc_message(Process* sender, { ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = sender->common.id; - queue_messages(sender, receiver, receiver_locks, + queue_messages(sender->common.id, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, mp), &mp->next, 1); } @@ -530,7 +552,7 @@ erts_queue_proc_messages(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, Uint len) { - queue_messages(sender, receiver, receiver_locks, + queue_messages(sender->common.id, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, first), last, len); } @@ -988,7 +1010,7 @@ change_off_heap_msgq(void *vcohmq) * process to complete this change itself. */ cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq; - erts_proc_sig_send_move_msgq_off_heap(NULL, cohmq->pid); + erts_proc_sig_send_move_msgq_off_heap(cohmq->pid); erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq); } diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index f37101645f..c17921d9d4 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -2610,7 +2610,8 @@ static int dtor_demonitor(ErtsMonitor* mon, void* context, Sint reds) ASSERT(erts_monitor_is_origin(mon)); ASSERT(is_internal_pid(mon->other.item)); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(NULL, am_system, 1, mon); + return 1; } @@ -3629,7 +3630,8 @@ int enif_monitor_process(ErlNifEnv* env, void* obj, const ErlNifPid* target_pid, rmon_refc_inc(rm); erts_mtx_unlock(&rm->lock); - if (!erts_proc_sig_send_monitor(&mdp->u.target, target_pid->pid)) { + if (!erts_proc_sig_send_monitor(NULL, am_system, &mdp->u.target, + target_pid->pid)) { /* Failed to send monitor signal; cleanup... */ #ifdef DEBUG ErtsBinary* bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(rsrc); @@ -3688,7 +3690,7 @@ int enif_demonitor_process(ErlNifEnv* env, void* obj, const ErlNifMonitor* monit ASSERT(erts_monitor_is_origin(mon)); ASSERT(is_internal_pid(mon->other.item)); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(NULL, am_system, 1, mon); return 0; } diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 5eb9904067..8dab59f15f 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -405,9 +405,18 @@ destroy_sig_group_leader(ErtsSigGroupLeader *sgl) } static ERTS_INLINE void -sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op, - Process *rp, ErtsMessage ***last_next) +sig_enqueue_trace(ErtsPTabElementCommon *sender, Eterm from, + ErtsMessage **sigp, int op, Process *rp, + ErtsMessage ***last_next) { + Process *c_p; + + if (sender == NULL || !is_internal_pid(from)) { + return; + } + + c_p = ErtsContainerStruct(sender, Process, common); + switch (op) { case ERTS_SIG_Q_OP_LINK: if (c_p @@ -430,25 +439,27 @@ sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op, ti->common.specific.next = &ti->common.next; ti->common.tag = tag; ti->flags_on = ERTS_TRACE_FLAGS(c_p) & TRACEE_FLAGS; - if (!(ti->flags_on & F_TRACE_SOL1)) + + if (!(ti->flags_on & F_TRACE_SOL1)) { ti->flags_off = 0; - else { + } else { ti->flags_off = F_TRACE_SOL1|F_TRACE_SOL; erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); ERTS_TRACE_FLAGS(c_p) &= ~(F_TRACE_SOL1|F_TRACE_SOL); erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); } + erts_tracer_update(&ti->tracer, ERTS_TRACER(c_p)); *sigp = (ErtsMessage *) ti; - if (!*last_next || *last_next == sigp) + + if (!*last_next || *last_next == sigp) { *last_next = &ti->common.next; + } } break; - #ifdef USE_VM_PROBES case ERTS_SIG_Q_OP_EXIT: case ERTS_SIG_Q_OP_EXIT_LINKED: - if (DTRACE_ENABLED(process_exit_signal)) { ErtsMessage* sig = *sigp; Uint16 type = ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag); @@ -479,9 +490,7 @@ sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op, } } break; - #endif - default: break; } @@ -704,7 +713,8 @@ check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig); static int -proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) +proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, + ErtsSignal *sig, int force_flush, int op) { int res; Process *rp; @@ -714,6 +724,20 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) erts_aint32_t state; ErtsSignal *pend_sig; + ASSERT(sender == NULL || sender->id == from); + + /* Tracing requires sender for local procs and ports. The assertions below + * will not catch errors after time-of-death, but ought to find most + * problems. */ + ASSERT(sender != NULL || + (is_normal_sched && esdp->pending_signal.sig == sig) || + (!(is_internal_pid(from) && + erts_proc_lookup(from) != NULL) && + !(is_internal_port(from) && + erts_port_lookup(from, ERTS_PORT_SFLGS_INVALID_LOOKUP) != NULL))); + + ASSERT(is_value(from) && is_internal_pid(pid)); + if (is_normal_sched) { pend_sig = esdp->pending_signal.sig; if (op == ERTS_SIG_Q_OP_MONITOR @@ -732,8 +756,10 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) if (pend_sig != sig) { /* Switch them and send previously pending signal instead */ Eterm pend_to = esdp->pending_signal.to; + esdp->pending_signal.sig = sig; esdp->pending_signal.to = pid; + sig = pend_sig; pid = pend_to; } @@ -747,9 +773,12 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) #endif pend_sig = NULL; } + rp = erts_proc_lookup_raw(pid); if (!rp) { - erts_proc_sig_send_monitor_down((ErtsMonitor*)sig, am_noproc); + erts_proc_sig_send_monitor_down(sender, from, + (ErtsMonitor*)sig, + am_noproc); return 1; } } @@ -760,7 +789,9 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) rp = erts_proc_lookup_raw(pid); if (!rp) { - erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc); + erts_proc_sig_send_monitor_down(sender, from, + (ErtsMonitor*)pend_sig, + am_noproc); return 0; } @@ -775,14 +806,16 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) else { pend_sig = NULL; rp = erts_proc_lookup_raw(pid); - if (!rp) + if (!rp) { return 0; + } } } else { rp = erts_proc_lookup_raw_inc_refc(pid); - if (!rp) + if (!rp) { return 0; + } pend_sig = NULL; } @@ -794,16 +827,18 @@ first_last_done: sig->common.specific.next = NULL; /* may add signals before sig */ - sig_enqueue_trace(c_p, sigp, op, rp, &last_next); + sig_enqueue_trace(sender, from, sigp, op, rp, &last_next); last->next = NULL; - if (op != ERTS_SIG_Q_OP_PROCESS_INFO && - erts_proc_sig_queue_try_enqueue_to_buffer(c_p, rp, 0, first, + if (!force_flush && op != ERTS_SIG_Q_OP_PROCESS_INFO && + erts_proc_sig_queue_try_enqueue_to_buffer(from, rp, 0, first, &last->next, last_next, 0, 1)) { - if (!is_normal_sched) + if (!is_normal_sched) { erts_proc_dec_refc(rp); + } + return 1; } @@ -811,11 +846,15 @@ first_last_done: state = erts_atomic32_read_nob(&rp->state); - erts_proc_sig_queue_maybe_install_buffers(rp, state); + if (force_flush) { + erts_proc_sig_queue_flush_buffers(rp); + } else { + erts_proc_sig_queue_maybe_install_buffers(rp, state); + } - if (ERTS_PSFLG_FREE & state) + if (ERTS_PSFLG_FREE & state) { res = 0; - else { + } else { state = enqueue_signals(rp, first, &last->next, last_next, 0, state, &rp->sig_inq); @@ -829,38 +868,42 @@ first_last_done: if (res == 0) { sig_enqueue_trace_cleanup(first, sig); if (pend_sig) { - erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc); + erts_proc_sig_send_monitor_down(sender, from, + (ErtsMonitor*)pend_sig, am_noproc); if (sig == pend_sig) { /* We did a switch, callers signal is now pending (still ok) */ ASSERT(esdp->pending_signal.sig); res = 1; } } - } - else + } else { erts_proc_notify_new_sig(rp, state, 0); + } - if (!is_normal_sched) + if (!is_normal_sched) { erts_proc_dec_refc(rp); + } return res; } -void erts_proc_sig_send_pending(ErtsSchedulerData* esdp) +void erts_proc_sig_send_pending(Process *c_p, ErtsSchedulerData* esdp) { - ErtsSignal* sig = esdp->pending_signal.sig; + ErtsSignal *sig = esdp->pending_signal.sig; + Eterm to = esdp->pending_signal.to; int op; ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + ASSERT(c_p && c_p == esdp->pending_signal.dbg_from); ASSERT(sig); - ASSERT(is_internal_pid(esdp->pending_signal.to)); + ASSERT(is_internal_pid(to)); op = ERTS_SIG_Q_OP_MONITOR; ASSERT(op == ERTS_PROC_SIG_OP(sig->common.tag)); - if (!proc_queue_signal(NULL, esdp->pending_signal.to, sig, op)) { + if (!proc_queue_signal(&c_p->common, c_p->common.id, to, sig, 0, op)) { ErtsMonitor* mon = (ErtsMonitor*)sig; - erts_proc_sig_send_monitor_down(mon, am_noproc); + erts_proc_sig_send_monitor_down(NULL, to, mon, am_noproc); } } @@ -1127,7 +1170,7 @@ erts_proc_sig_get_external(ErtsMessage *msgp) static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg); static void -send_gen_exit_signal(Process *c_p, Eterm from_tag, +send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, Eterm from, Eterm to, Sint16 op, Eterm reason, ErtsDistExternal *dist_ext, ErlHeapFragment *dist_ext_hfrag, @@ -1141,11 +1184,18 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ErlOffHeap *ohp; Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz = 0; int seq_trace; + Process *c_p; #ifdef USE_VM_PROBES Eterm s_utag, utag; Uint utag_sz; #endif + if (sender && is_internal_pid(from)) { + c_p = ErtsContainerStruct(sender, Process, common); + } else { + c_p = NULL; + } + ASSERT((is_value(reason) && dist_ext == NULL) || (is_non_value(reason) && dist_ext != NULL)); @@ -1154,8 +1204,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm); seq_trace = c_p && have_seqtrace(token); - if (seq_trace) + if (seq_trace) { seq_trace_update_serial(c_p); + } #ifdef USE_VM_PROBES utag_sz = 0; @@ -1292,12 +1343,20 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size); - if (seq_trace) + if (seq_trace) { do_seq_trace_output(to, s_token, s_message); + } - if (!proc_queue_signal(c_p, to, (ErtsSignal *) mp, op)) { - mp->next = NULL; - erts_cleanup_messages(mp); + { + /* Ensure that we're ordered relative to the sender process if one + * exists, and not `from` as it may be a name instead of a pid. */ + Eterm order_by = sender ? sender->id : from; + + if (!proc_queue_signal(sender, order_by, to, (ErtsSignal *)mp, + !(is_pid(order_by) || is_port(order_by)), op)) { + mp->next = NULL; + erts_cleanup_messages(mp); + } } } @@ -1610,7 +1669,7 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm hfrag_low, hfrag_high); } - if (!proc_queue_signal(c_p, pid, (ErtsSignal *) mp, + if (!proc_queue_signal(&c_p->common, from, pid, (ErtsSignal *) mp, 0, ERTS_SIG_Q_OP_ALIAS_MSG)) { mp->next = NULL; erts_cleanup_messages(mp); @@ -1629,7 +1688,8 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm } void -erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep, +erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias, + ErtsDistExternal *edep, ErlHeapFragment *hfrag, Eterm token) { ErtsMessage* mp; @@ -1692,12 +1752,11 @@ erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep, ERTS_SIG_Q_TYPE_DIST, 0); - if (!proc_queue_signal(NULL, pid, (ErtsSignal *) mp, + if (!proc_queue_signal(NULL, from, pid, (ErtsSignal *) mp, 0, ERTS_SIG_Q_OP_ALIAS_MSG)) { mp->next = NULL; erts_cleanup_messages(mp); } - } @@ -1757,7 +1816,7 @@ erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key, ERL_MESSAGE_FROM(mp) = from; ERL_MESSAGE_TOKEN(mp) = am_undefined; - if (!proc_queue_signal(NULL, to, (ErtsSignal *) mp, + if (!proc_queue_signal(NULL, from, to, (ErtsSignal *) mp, 0, ERTS_SIG_Q_OP_PERSISTENT_MON_MSG)) { mp->next = NULL; erts_cleanup_messages(mp); @@ -1776,12 +1835,14 @@ get_persist_mon_msg(ErtsMessage *sig, Eterm *msg) } void -erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, +erts_proc_sig_send_exit(ErtsPTabElementCommon *sender, Eterm from, Eterm to, Eterm reason, Eterm token, int normal_kills) { Eterm from_tag; - ASSERT(!c_p || c_p->common.id == from); + + ASSERT(sender == NULL || sender->id == from); + if (is_immed(from)) { ASSERT(is_internal_pid(from) || is_internal_port(from)); from_tag = from; @@ -1792,7 +1853,8 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, dep = external_pid_dist_entry(from); from_tag = dep->sysname; } - send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT, + + send_gen_exit_signal(sender, from_tag, from, to, ERTS_SIG_Q_OP_EXIT, reason, NULL, NULL, NIL, token, normal_kills, 0, 0); } @@ -1809,87 +1871,84 @@ erts_proc_sig_send_dist_exit(DistEntry *dep, } void -erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, - Eterm reason, Eterm token) +erts_proc_sig_send_link_exit_noconnection(ErtsLink *lnk) { Eterm to, from_tag, from_item; - int conn_lost; + ErtsLink *olnk; + ErtsELink *elnk; Uint32 conn_id; - ASSERT(!c_p || c_p->common.id == from); + + to = lnk->other.item; + + ASSERT(lnk->flags & ERTS_ML_FLG_EXTENDED); + ASSERT(lnk->type == ERTS_LNK_TYPE_DIST_PROC); + + olnk = erts_link_to_other(lnk, &elnk); + + from_item = olnk->other.item; + from_tag = elnk->dist->nodename; + conn_id = elnk->dist->connection_id; + + send_gen_exit_signal(NULL, from_tag, from_item, to, ERTS_SIG_Q_OP_EXIT_LINKED, + am_noconnection, NULL, NULL, NIL, NIL, 0, !0, conn_id); + + erts_link_release(lnk); +} + +void +erts_proc_sig_send_link_exit(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk, Eterm reason, Eterm token) +{ + Eterm to; + + ASSERT(sender == NULL || sender->id == from); ASSERT(lnk); + to = lnk->other.item; - if (is_value(from)) { - ASSERT(is_internal_pid(from) || is_internal_port(from)); - from_tag = from_item = from; - conn_id = 0; - conn_lost = 0; - } - else { - ErtsLink *olnk; - ErtsELink *elnk; - ASSERT(reason == am_noconnection); - ASSERT(lnk->flags & ERTS_ML_FLG_EXTENDED); - ASSERT(lnk->type == ERTS_LNK_TYPE_DIST_PROC); + ASSERT(is_internal_pid(from) || is_internal_port(from)); - olnk = erts_link_to_other(lnk, &elnk); + send_gen_exit_signal(sender, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, + reason, NULL, NULL, NIL, token, 0, 0, 0); - from_item = olnk->other.item; - from_tag = elnk->dist->nodename; - conn_id = elnk->dist->connection_id; - conn_lost = !0; - } - send_gen_exit_signal(c_p, from_tag, from_item, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, NULL, NULL, NIL, token, 0, conn_lost, conn_id); erts_link_release(lnk); } int -erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk) +erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from, + Eterm to, ErtsLink *lnk) { ErtsSignal *sig; Uint16 type = lnk->type; - ASSERT(!c_p || c_p->common.id == lnk->other.item); - ASSERT(lnk); + ASSERT(!sender || sender->id == from); + ASSERT(lnk && eq(from, lnk->other.item)); ASSERT(is_internal_pid(to)); sig = (ErtsSignal *) lnk; sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_LINK, type, 0); - return proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_LINK); + return proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_LINK); } ErtsSigUnlinkOp * -erts_proc_sig_make_unlink_op(Process *c_p, Eterm from) +erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from) { - Uint64 id; ErtsSigUnlinkOp *sulnk; - if (c_p) - id = erts_proc_sig_new_unlink_id(c_p); - else { - /* - * *Only* ports are allowed to call without current - * process pointer... - */ - ASSERT(is_internal_port(from)); - id = (Uint64) erts_raw_get_unique_monotonic_integer(); - if (id == 0) - id = (Uint64) erts_raw_get_unique_monotonic_integer(); - } - - ASSERT(id != 0); + + ASSERT(sender->id == from); sulnk = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigUnlinkOp)); sulnk->from = from; - sulnk->id = id; + sulnk->id = erts_proc_sig_new_unlink_id(sender); return sulnk; } Uint64 -erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk) +erts_proc_sig_send_unlink(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk) { int res; ErtsSignal *sig; @@ -1901,7 +1960,7 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk) || lnk->type != ERTS_LNK_TYPE_PORT); ASSERT(lnk->flags & ERTS_ML_FLG_IN_TABLE); - sulnk = erts_proc_sig_make_unlink_op(c_p, from); + sulnk = erts_proc_sig_make_unlink_op(sender, from); id = sulnk->id; sig = (ErtsSignal *) sulnk; to = lnk->other.item; @@ -1909,7 +1968,7 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk) lnk->type, 0); ASSERT(is_internal_pid(to)); - res = proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_UNLINK); + res = proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_UNLINK); if (res == 0) { erts_proc_sig_destroy_unlink_op(sulnk); return 0; @@ -1918,7 +1977,8 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk) } void -erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from, ErtsSigUnlinkOp *sulnk) +erts_proc_sig_send_unlink_ack(ErtsPTabElementCommon *sender, Eterm from, + ErtsSigUnlinkOp *sulnk) { ErtsSignal *sig = (ErtsSignal *) sulnk; Eterm to = sulnk->from; @@ -1931,8 +1991,10 @@ erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from, ErtsSigUnlinkOp *sulnk) type = is_internal_pid(from) ? ERTS_LNK_TYPE_PROC : ERTS_LNK_TYPE_PORT; sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_UNLINK_ACK, type, 0); - if (!proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_UNLINK_ACK)) - erts_proc_sig_destroy_unlink_op(sulnk); + + if (!proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_UNLINK_ACK)) { + erts_proc_sig_destroy_unlink_op(sulnk); + } } void @@ -1965,12 +2027,14 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id, dep->sysname, conn_id, to, from, id); - if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_UNLINK)) + if (!proc_queue_signal(NULL, from, to, sig, 0, + ERTS_SIG_Q_OP_UNLINK)) { reply_dist_unlink_ack(NULL, (ErtsSigDistUnlinkOp *) sig); + } } void -erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, +erts_proc_sig_send_dist_unlink_ack(DistEntry *dep, Uint32 conn_id, Eterm from, Eterm to, Uint64 id) { @@ -1985,8 +2049,10 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, dep->sysname, conn_id, to, from, id); - if (!proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_UNLINK_ACK)) + if (!proc_queue_signal(NULL, from, to, sig, 0, + ERTS_SIG_Q_OP_UNLINK_ACK)) { destroy_sig_dist_unlink_op((ErtsSigDistUnlinkOp *) sig); + } } static void @@ -2039,17 +2105,20 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm reason) { Eterm monitored, heap[3]; + if (is_atom(from)) monitored = TUPLE2(&heap[0], from, dep->sysname); else monitored = from; + send_gen_exit_signal(NULL, dep->sysname, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, reason, dist_ext, hfrag, ref, NIL, 0, 0, 0); } void -erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) +erts_proc_sig_send_monitor_down(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm reason) { Eterm to; @@ -2069,8 +2138,11 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) sig = (ErtsSignal *) mon; sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR_DOWN, mon->type, 0); - if (proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_MONITOR_DOWN)) + if (proc_queue_signal(sender, from, to, sig, + !(is_pid(from) || is_port(from)), + ERTS_SIG_Q_OP_MONITOR_DOWN)) { return; /* receiver will destroy mon structure */ + } } else { ErtsMonitorData *mdp = erts_monitor_to_data(mon); @@ -2114,16 +2186,18 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) || is_atom(from_tag)); monitored = TUPLE2(&heap[0], name, node); } - send_gen_exit_signal(NULL, from_tag, monitored, + + send_gen_exit_signal(sender, from_tag, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, reason, NULL, NULL, mdp->ref, NIL, 0, 0, 0); } + erts_monitor_release(mon); } void -erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref) +erts_proc_sig_send_dist_demonitor(Eterm from, Eterm to, Eterm ref) { ErtsSigDistProcDemonitor *dmon; ErtsSignal *sig; @@ -2133,6 +2207,7 @@ erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref) ERTS_INIT_OFF_HEAP(&oh); + ASSERT(is_external_pid(from)); ASSERT(is_internal_pid(to)); size = sizeof(ErtsSigDistProcDemonitor) - sizeof(Eterm); @@ -2149,12 +2224,14 @@ erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref) ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR, 0); - if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_DEMONITOR)) + if (!proc_queue_signal(NULL, from, to, sig, 0, ERTS_SIG_Q_OP_DEMONITOR)) { destroy_dist_proc_demonitor(dmon); + } } void -erts_proc_sig_send_demonitor(ErtsMonitor *mon) +erts_proc_sig_send_demonitor(ErtsPTabElementCommon *sender, Eterm from, + int system, ErtsMonitor *mon) { ErtsSignal *sig = (ErtsSignal *) mon; Uint16 type = mon->type; @@ -2163,16 +2240,21 @@ erts_proc_sig_send_demonitor(ErtsMonitor *mon) ASSERT(is_internal_pid(to)); ASSERT(erts_monitor_is_origin(mon)); ASSERT(!erts_monitor_is_in_table(mon)); + ASSERT(!system || sender == NULL); sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_DEMONITOR, type, 0); - - if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_DEMONITOR)) + + if (!proc_queue_signal(sender, from, to, sig, + !(system || (is_pid(from) || is_port(from))), + ERTS_SIG_Q_OP_DEMONITOR)) { erts_monitor_release(mon); + } } int -erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to) +erts_proc_sig_send_monitor(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm to) { ErtsSignal *sig = (ErtsSignal *) mon; Uint16 type = mon->type; @@ -2182,31 +2264,8 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to) sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR, type, 0); - - return proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_MONITOR); -} - -void -erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off, Eterm tracer) -{ - ErtsSigTraceInfo *ti; - Eterm tag; - ti = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigTraceInfo)); - tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_TRACE_CHANGE_STATE, - ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO, - 0); - - ti->common.tag = tag; - ti->flags_off = off; - ti->flags_on = on; - ti->tracer = NIL; - if (is_not_nil(tracer)) - erts_tracer_update(&ti->tracer, tracer); - - if (!proc_queue_signal(NULL, to, (ErtsSignal *) ti, - ERTS_SIG_Q_OP_TRACE_CHANGE_STATE)) - destroy_trace_info(ti); + return proc_queue_signal(sender, from, to, sig, 0, ERTS_SIG_Q_OP_MONITOR); } void @@ -2245,12 +2304,12 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) ERTS_SIG_Q_TYPE_UNDEFINED, 0); - res = proc_queue_signal(c_p, to, (ErtsSignal *) sgl, - ERTS_SIG_Q_OP_GROUP_LEADER); + res = proc_queue_signal(c_p ? &c_p->common : NULL, sgl->reply_to, to, + (ErtsSignal *)sgl, 0, ERTS_SIG_Q_OP_GROUP_LEADER); - if (!res) + if (!res) { destroy_sig_group_leader(sgl); - else if (c_p) { + } else if (c_p) { erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER; int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to); if (!prio_res) @@ -2300,7 +2359,8 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) ERTS_SIG_Q_TYPE_UNDEFINED, 0); - if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) { + if (proc_queue_signal(&c_p->common, c_p->common.id, to, + (ErtsSignal *)mp, 0, ERTS_SIG_Q_OP_IS_ALIVE)) { (void) maybe_elevate_sig_handling_prio(c_p, -1, to); return !0; } @@ -2357,12 +2417,15 @@ erts_proc_sig_send_process_info_request(Process *c_p, sys_memcpy((void *) &pis->item_ix[0], (void *) item_ix, sizeof(int)*len); - res = proc_queue_signal(c_p, to, (ErtsSignal *) pis, - ERTS_SIG_Q_OP_PROCESS_INFO); - if (res) + + res = proc_queue_signal(&c_p->common, c_p->common.id, to, + (ErtsSignal *)pis, 0, ERTS_SIG_Q_OP_PROCESS_INFO); + if (res) { (void) maybe_elevate_sig_handling_prio(c_p, -1, to); - else + } else { erts_free(ERTS_ALC_T_SIG_DATA, pis); + } + return res; } @@ -2406,9 +2469,10 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) ERTS_SIG_Q_TYPE_UNDEFINED, 0); - if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) + if (proc_queue_signal(&c_p->common, c_p->common.id, to, + (ErtsSignal *)mp, 0, ERTS_SIG_Q_OP_SYNC_SUSPEND)) { (void) maybe_elevate_sig_handling_prio(c_p, -1, to); - else { + } else { Eterm *tp; /* It wasn't alive; reply to ourselves... */ mp->next = NULL; @@ -2506,7 +2570,8 @@ erts_proc_sig_send_dist_spawn_reply(Eterm node, 0); ERL_MESSAGE_FROM(mp) = node; ERL_MESSAGE_TOKEN(mp) = token_copy; - if (!proc_queue_signal(NULL, to, (ErtsSignal *) mp, + + if (!proc_queue_signal(NULL, am_spawn_service, to, (ErtsSignal *)mp, 0, ERTS_SIG_Q_OP_DIST_SPAWN_REPLY)) { mp->next = NULL; mp->data.attached = ERTS_MSG_COMBINED_HFRAG; @@ -2562,9 +2627,10 @@ erts_proc_sig_send_rpc_request_prio(Process *c_p, erts_msgq_set_save_end(c_p); } - if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC)) + if (proc_queue_signal(&c_p->common, c_p->common.id, to, (ErtsSignal *)sig, + 0, ERTS_SIG_Q_OP_RPC)) { (void) maybe_elevate_sig_handling_prio(c_p, prio, to); - else { + } else { erts_free(ERTS_ALC_T_SIG_DATA, sig); res = THE_NON_VALUE; if (reply) @@ -2620,12 +2686,14 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) ERL_MESSAGE_DT_UTAG(sig) = NIL; #endif - if (!proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_ADJ_MSGQ)) + if (!proc_queue_signal(&c_p->common, c_p->common.id, to, (ErtsSignal *)sig, + 0, ERTS_SIG_Q_OP_ADJ_MSGQ)) { send_cla_reply(c_p, sig, c_p->common.id, req_id_cpy, am_ok); + } } void -erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to) +erts_proc_sig_send_move_msgq_off_heap(Eterm to) { ErtsMessage *sig = erts_alloc_message(0, NULL); ASSERT(is_internal_pid(to)); @@ -2637,9 +2705,11 @@ erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to) #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(sig) = NIL; #endif - if (!proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_ADJ_MSGQ)) { - sig->next = NULL; - erts_cleanup_messages(sig); + + if (!proc_queue_signal(NULL, am_system, to, (ErtsSignal *)sig, 0, + ERTS_SIG_Q_OP_ADJ_MSGQ)) { + sig->next = NULL; + erts_cleanup_messages(sig); } } @@ -4939,7 +5009,7 @@ handle_alias_message(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) erts_monitor_release(mon); break; case ERTS_MON_TYPE_PROC: - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&c_p->common, c_p->common.id, 0, mon); break; case ERTS_MON_TYPE_DIST_PROC: { ErtsMonitorData *mdp; @@ -5463,9 +5533,10 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, erts_link_release(llnk); cnt += 4; } - if (is_internal_pid(sulnk->from)) - erts_proc_sig_send_unlink_ack(c_p, c_p->common.id, sulnk); - else { + if (is_internal_pid(sulnk->from)) { + erts_proc_sig_send_unlink_ack(&c_p->common, c_p->common.id, + sulnk); + } else { Port *prt; ASSERT(is_internal_port(sulnk->from)); prt = erts_port_lookup(sulnk->from, @@ -6000,12 +6071,12 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, } case ERTS_SIG_Q_OP_UNLINK: - if (type == ERTS_SIG_Q_TYPE_DIST_LINK) + if (type == ERTS_SIG_Q_TYPE_DIST_LINK) { reply_dist_unlink_ack(c_p, (ErtsSigDistUnlinkOp *) sig); - else if (is_internal_pid(((ErtsSigUnlinkOp *) sig)->from)) - erts_proc_sig_send_unlink_ack(c_p, c_p->common.id, + } else if (is_internal_pid(((ErtsSigUnlinkOp *) sig)->from)) { + erts_proc_sig_send_unlink_ack(&c_p->common, c_p->common.id, (ErtsSigUnlinkOp *) sig); - else { + } else { Port *prt; ASSERT(is_internal_port(((ErtsSigUnlinkOp *) sig)->from)); prt = erts_port_lookup(((ErtsSigUnlinkOp *) sig)->from, @@ -7956,7 +8027,7 @@ static void proc_sig_queue_unlock_buffer(ErtsSignalInQueueBuffer* slot) } int -erts_proc_sig_queue_try_enqueue_to_buffer(Process* sender, /* is NULL if the sender is not a local process */ +erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, @@ -7972,23 +8043,25 @@ erts_proc_sig_queue_try_enqueue_to_buffer(Process* sender, /* is NULL if the sen /* We never need to unget the buffers array if we do not get it */ return 0; } else { - /* - * Use the sender process ID to hash to an outer signal queue - * buffer. This guarantees that all signals from the same - * process are ordered in send order. - */ - Uint to_hash = - (sender == NULL ? 0 : internal_pid_number(sender->common.id)); - Uint slot = to_hash % ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; - ErtsSignalInQueueBuffer* buffer = &buffers->slots[slot]; - Uint64 nonempty_slots_before = 1; - Uint32 state; - /* - * Multiple signals or is_nonmsg_signal_enqueue means that we - * report that there is a non-msg signal in the queue. - */ - int is_nonmsg_signal_or_multi_sig = - is_nonmsg_signal_enqueue || !(last == &first->next); + int is_nonmsg_signal_or_multi_sig; + ErtsSignalInQueueBuffer* buffer; + Uint64 nonempty_slots_before; + Uint32 slot, state; + + ASSERT(is_value(from)); + + /* Use the sender id to hash to an outer signal queue buffer. This + * guarantees that all signals from the same process are ordered in + * send order. */ + slot = make_internal_hash(from, 0) % + ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; + buffer = &buffers->slots[slot]; + nonempty_slots_before = 1; + + /* Multiple signals or is_nonmsg_signal_enqueue means that we + * report that there is a non-msg signal in the queue. */ + is_nonmsg_signal_or_multi_sig = is_nonmsg_signal_enqueue || + !(last == &first->next); proc_sig_queue_lock_buffer(buffer); diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 988e9b6e45..8730fe663a 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -252,7 +252,7 @@ ErtsSignalInQueueBufferArray* erts_proc_sig_queue_get_buffers(Process* p, int *need_unread); void erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, int need_unget); -int erts_proc_sig_queue_try_enqueue_to_buffer(Process* sender, /* is NULL if the sender is not a local process */ +int erts_proc_sig_queue_try_enqueue_to_buffer(Eterm from, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, @@ -327,8 +327,10 @@ struct dist_entry_; * @brief Send an exit signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Identifier of sender. * @@ -345,7 +347,7 @@ struct dist_entry_; * */ void -erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, +erts_proc_sig_send_exit(ErtsPTabElementCommon *sender, Eterm from, Eterm to, Eterm reason, Eterm token, int normal_kills); /** @@ -380,13 +382,28 @@ erts_proc_sig_send_dist_exit(DistEntry *dep, ErlHeapFragment *hfrag, Eterm reason, Eterm token); +/** + * + * @brief Send an exit signal due to a link to a process being + * broken by connection loss. + * + * @param[in] lnk Pointer to link structure + * from the sending side. It + * should contain information + * about receiver. + */ +void +erts_proc_sig_send_link_exit_noconnection(ErtsLink *lnk); + /** * * @brief Send an exit signal due to broken link to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Identifier of sender. * @@ -401,16 +418,18 @@ erts_proc_sig_send_dist_exit(DistEntry *dep, * */ void -erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, - Eterm reason, Eterm token); +erts_proc_sig_send_link_exit(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk, Eterm reason, Eterm token); /** * * @brief Send an link signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] to Identifier of receiver. * @@ -428,7 +447,8 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, * */ int -erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk); +erts_proc_sig_send_link(ErtsPTabElementCommon *sender, Eterm from, + Eterm to, ErtsLink *lnk); /** * @@ -437,15 +457,15 @@ erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk); * The newly created unlink identifier is to be used in an * unlink operation. * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port. * * @return A new 64-bit unlink identifier * unique in context of the * calling process. The identifier * may be any value but zero. */ -ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); +ERTS_GLB_INLINE +Uint64 erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender); /** * @@ -454,13 +474,10 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); * The structure will contain a newly created unlink * identifier to be used in the operation. * - * @param[in] c_p Pointer to process struct of - * currently executing process - * ('from' is a process - * identifier), or NULL if not - * called in the context of an - * executing process ('from' is - * a port identifier). + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -470,7 +487,7 @@ ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p); * structure. */ ErtsSigUnlinkOp * -erts_proc_sig_make_unlink_op(Process *c_p, Eterm from); +erts_proc_sig_make_unlink_op(ErtsPTabElementCommon *sender, Eterm from); /** * @@ -487,8 +504,10 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk); * @brief Send an unlink signal to a process. * * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -500,15 +519,18 @@ erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk); * receiver. */ Uint64 -erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk); +erts_proc_sig_send_unlink(ErtsPTabElementCommon *sender, Eterm from, + ErtsLink *lnk); /** * * @brief Send an unlink acknowledgment signal to a process. * - * - * @param[in] c_p Pointer to process struct of - * currently executing process. + * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * * @param[in] from Id (as an erlang term) of * entity sending the unlink @@ -521,7 +543,7 @@ erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk); * signal. */ void -erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from, +erts_proc_sig_send_unlink_ack(ErtsPTabElementCommon *sender, Eterm from, ErtsSigUnlinkOp *sulnk); /** @@ -583,11 +605,6 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id, * This function is used instead of erts_proc_sig_send_unlink_ack() * when the signal arrives via the distribution. * - * @param[in] c_p Pointer to process struct of - * currently executing process or - * NULL if not called in the context - * of an executing process. - * * @param[in] dep Distribution entry of channel * that the signal arrived on. * @@ -598,7 +615,7 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id, * @param[in] id Identifier of unlink operation. */ void -erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, +erts_proc_sig_send_dist_unlink_ack(DistEntry *dep, Uint32 conn_id, Eterm from, Eterm to, Uint64 id); @@ -606,6 +623,14 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, * * @brief Send a monitor down signal to a process. * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). + * + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * * @param[in] mon Pointer to target monitor * structure from the sending * side. It should contain @@ -615,27 +640,48 @@ erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep, * */ void -erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason); +erts_proc_sig_send_monitor_down(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm reason); /** * * @brief Send a demonitor signal to a process. * - * @param[in] mon Pointer to origin monitor - * structure from the sending - * side. It should contain - * information about receiver. + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). * - * @param[in] reason Exit reason. + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * + * @param[in] system Whether the sender is considered a + * system service, e.g. a NIF monitor, + * and it's okay to order by `from` + * even when it's not a pid or port. + * + * @param[in] mon Pointer to origin monitor + * structure from the sending + * side. It should contain + * information about receiver. * */ void -erts_proc_sig_send_demonitor(ErtsMonitor *mon); +erts_proc_sig_send_demonitor(ErtsPTabElementCommon *sender, Eterm from, + int system, ErtsMonitor *mon); /** * * @brief Send a monitor signal to a process. * + * @param[in] sender Pointer to the sending process/port, + * if any, as it may not be possible to + * resolve the sender (e.g. after it's + * dead). + * + * @param[in] from Sending entity, must be provided + * to maintain signal order. + * * @param[in] mon Pointer to target monitor * structure to insert on * receiver side. @@ -653,7 +699,8 @@ erts_proc_sig_send_demonitor(ErtsMonitor *mon); * */ int -erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to); +erts_proc_sig_send_monitor(ErtsPTabElementCommon *sender, Eterm from, + ErtsMonitor *mon, Eterm to); /** * @@ -696,13 +743,15 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, * when the signal arrives via the distribution and * no monitor structure is available. * + * @param[in] from Identifier of sender. + * * @param[in] to Identifier of receiver. * * @param[in] ref Reference identifying the monitor. * */ void -erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref); +erts_proc_sig_send_dist_demonitor(Eterm from, Eterm to, Eterm ref); /** * @@ -729,24 +778,6 @@ erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key, Eterm from, Eterm to, Eterm msg, Uint msg_sz); -/** - * - * @brief Send a trace change signal to a process. - * - * @param[in] to Identifier of receiver. - * - * @param[in] on Trace flags to enable. - * - * @param[in] off Trace flags to disable. - * - * @param[in] tracer Tracer to set. If the non-value, - * tracer will not be changed. - * - */ -void -erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off, - Eterm tracer); - /** * * @brief Send a group leader signal to a process. @@ -1066,14 +1097,11 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id); * * When received, all on heap messages will be moved off heap. * - * @param[in] c_p Pointer to process struct of - * currently executing process. - * * @param[in] to Identifier of receiver. * */ void -erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to); +erts_proc_sig_send_move_msgq_off_heap(Eterm to); /* * End of send operations of currently supported process signals. @@ -1279,7 +1307,7 @@ erts_enqueue_signals(Process *rp, ErtsMessage *first, * */ void -erts_proc_sig_send_pending(ErtsSchedulerData* esdp); +erts_proc_sig_send_pending(Process *c_p, ErtsSchedulerData* esdp); void @@ -1287,7 +1315,8 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm token); void -erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep, +erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias, + ErtsDistExternal *edep, ErlHeapFragment *hfrag, Eterm token); /** @@ -1644,14 +1673,30 @@ void erts_msgq_remove_leading_recv_markers(Process *c_p); #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE Uint64 -erts_proc_sig_new_unlink_id(Process *c_p) +erts_proc_sig_new_unlink_id(ErtsPTabElementCommon *sender) { Uint64 id; - ASSERT(c_p); - id = (Uint64) c_p->uniq++; - if (id == 0) + ASSERT(sender); + + if (is_internal_pid(sender->id)) { + Process *c_p = ErtsContainerStruct(sender, Process, common); id = (Uint64) c_p->uniq++; + + if (id == 0) { + id = (Uint64) c_p->uniq++; + } + } else { + ASSERT(is_internal_port(sender->id)); + + id = (Uint64) erts_raw_get_unique_monotonic_integer(); + if (id == 0) { + id = (Uint64) erts_raw_get_unique_monotonic_integer(); + } + } + + ASSERT(id != 0); + return id; } @@ -1686,12 +1731,10 @@ erts_proc_sig_fetch(Process *proc) Uint32 state = erts_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_SIG_IN_Q & state) && erts_atomic64_read_nob(&buffers->nonmsg_slots)) { - /* - * We may have raced with a thread inserting into a buffer + /* We may have raced with a thread inserting into a buffer * when resetting the flag ERTS_PSFLG_SIG_IN_Q in one of * the fetch functions above so we have to make sure that - * it is set when there is a nonmsg signal in the buffers. - */ + * it is set when there is a nonmsg signal in the buffers. */ erts_atomic32_read_bor_nob(&proc->state, ERTS_PSFLG_SIG_IN_Q | ERTS_PSFLG_ACTIVE); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 6d289d2fe6..c75f67c0d7 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -8989,8 +8989,10 @@ erts_internal_suspend_process_2(BIF_ALIST_2) erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); } if (send_sig) { - if (erts_proc_sig_send_monitor(&mdp->u.target, BIF_ARG_1)) + if (erts_proc_sig_send_monitor(&BIF_P->common, BIF_P->common.id, + &mdp->u.target, BIF_ARG_1)) { sync = !async; + } else { noproc: erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin); @@ -9047,7 +9049,7 @@ resume_process_1(BIF_ALIST_1) if ((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) == 0) { erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), mon); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&BIF_P->common, BIF_P->common.id, 0, mon); } BIF_RET(am_true); @@ -9497,8 +9499,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp)); if (esdp->pending_signal.sig) { - ASSERT(esdp->pending_signal.dbg_from == p); - erts_proc_sig_send_pending(esdp); + erts_proc_sig_send_pending(p, esdp); } } else { @@ -12592,8 +12593,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->common.id, parent_id); erts_link_tree_insert(&ERTS_P_LINKS(p), &ldp->proc); if (!erts_link_dist_insert(&ldp->dist, so->mld)) { - erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &ldp->dist, - am_noconnection, NIL); + erts_proc_sig_send_link_exit_noconnection(&ldp->dist); } } @@ -13316,7 +13316,9 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) switch (mon->type) { case ERTS_MON_TYPE_SUSPEND: case ERTS_MON_TYPE_PROC: - erts_proc_sig_send_monitor_down(mon, reason); + erts_proc_sig_send_monitor_down(&c_p->common, + c_p->common.id, + mon, reason); mon = NULL; break; case ERTS_MON_TYPE_PORT: { @@ -13393,7 +13395,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) switch (mon->type) { case ERTS_MON_TYPE_SUSPEND: case ERTS_MON_TYPE_PROC: - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&c_p->common, c_p->common.id, 0, mon); mon = NULL; break; case ERTS_MON_TYPE_TIME_OFFSET: @@ -13564,7 +13566,7 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds) switch (lnk->type) { case ERTS_LNK_TYPE_PROC: ASSERT(is_internal_pid(lnk->other.item)); - erts_proc_sig_send_link_exit(c_p, c_p->common.id, lnk, + erts_proc_sig_send_link_exit(&c_p->common, c_p->common.id, lnk, reason, SEQ_TRACE_TOKEN(c_p)); lnk = NULL; break; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 48c5c20bdd..0943d0c210 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -6014,8 +6014,7 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, #endif /* Send unlink ack to local sender... */ - erts_proc_sig_send_dist_unlink_ack(NULL, dep, - dep->connection_id, + erts_proc_sig_send_dist_unlink_ack(dep, dep->connection_id, remote, local, id); transcode_decode_state_destroy(&tds); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index e1a58a1262..8014e7bd08 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -819,7 +819,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ port_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, pid); erts_link_tree_insert(&ERTS_P_LINKS(port), port_lnk); proc_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, port->common.id); - if (!erts_proc_sig_send_link(NULL, pid, proc_lnk)) { + if (!erts_proc_sig_send_link(&port->common, port->common.id, + pid, proc_lnk)) { erts_link_tree_delete(&ERTS_P_LINKS(port), port_lnk); erts_link_internal_release(proc_lnk); erts_link_internal_release(port_lnk); @@ -1230,7 +1231,7 @@ send_badsig(Port *prt) { ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); ASSERT(is_internal_pid(connected)); - erts_proc_sig_send_exit(NULL, prt->common.id, connected, + erts_proc_sig_send_exit(&prt->common, prt->common.id, connected, am_badsig, NIL, 0); } /* send_badsig */ @@ -2350,7 +2351,8 @@ set_port_connected(int bang_op, if (created) { ErtsLink *olnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, prt->common.id); - if (!erts_proc_sig_send_link(NULL, connect, olnk)) { + if (!erts_proc_sig_send_link(&prt->common, prt->common.id, + connect, olnk)) { erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); erts_link_internal_release(lnk); erts_link_internal_release(olnk); @@ -2471,39 +2473,47 @@ erts_port_connect(Process *c_p, } static void -port_unlink_failure(Eterm port_id, ErtsSigUnlinkOp *sulnk) +port_unlink_failure(Port *prt, Eterm port_id, ErtsSigUnlinkOp *sulnk) { - erts_proc_sig_send_unlink_ack(NULL, port_id, sulnk); + erts_proc_sig_send_unlink_ack(prt ? &prt->common : NULL, port_id, sulnk); } static void port_unlink(Port *prt, erts_aint32_t state, ErtsSigUnlinkOp *sulnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_unlink_failure(prt->common.id, sulnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_unlink_failure(prt, prt->common.id, sulnk); + } else { ErtsILink *ilnk; + ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), sulnk->from); + if (ilnk && !ilnk->unlinking) { if (IS_TRACED_FL(prt, F_TRACE_PORTS)) trace_port(prt, am_getting_unlinked, sulnk->from); erts_link_tree_delete(&ERTS_P_LINKS(prt), &ilnk->link); erts_link_internal_release(&ilnk->link); } - erts_proc_sig_send_unlink_ack(NULL, prt->common.id, sulnk); + + erts_proc_sig_send_unlink_ack(&prt->common, prt->common.id, sulnk); } } static int port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_unlink(prt, state, sigdp->u.unlink.sulnk); - else - port_unlink_failure(sigdp->u.unlink.port_id, sigdp->u.unlink.sulnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_unlink(prt, state, sigdp->u.unlink.sulnk); + } else { + port_unlink_failure(prt, sigdp->u.unlink.port_id, + sigdp->u.unlink.sulnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_UNLINK; } @@ -2528,7 +2538,7 @@ erts_port_unlink(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk, Eterm *refp) BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK); return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - port_unlink_failure(prt->common.id, sulnk); + port_unlink_failure(prt, prt->common.id, sulnk); return ERTS_PORT_OP_DROPPED; default: /* Schedule call instead... */ @@ -2628,17 +2638,17 @@ erts_port_unlink_ack(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk) } static void -port_link_failure(Eterm port_id, ErtsLink *lnk) +port_link_failure(Port *port, Eterm port_id, ErtsLink *lnk) { - erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL); + erts_proc_sig_send_link_exit(&port->common, port_id, lnk, am_noproc, NIL); } static void port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_link_failure(prt->common.id, nlnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_link_failure(prt, prt->common.id, nlnk); + } else { ErtsLink *lnk; lnk = erts_link_tree_lookup_insert(&ERTS_P_LINKS(prt), nlnk); if (lnk) @@ -2651,12 +2661,16 @@ port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) static int port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_link(prt, state, sigdp->u.link.lnk); - else - port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_link(prt, state, sigdp->u.link.lnk); + } else { + port_link_failure(prt, sigdp->u.link.port_id, sigdp->u.link.lnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_LINK; } @@ -2701,9 +2715,11 @@ erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp) } static void -port_monitor_failure(Eterm port_id, ErtsMonitor *mon) +port_monitor_failure(Port *prt, Eterm port_id, ErtsMonitor *mon) { - erts_proc_sig_send_monitor_down(mon, am_noproc); + ASSERT(prt == NULL || prt->common.id == port_id); + erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, port_id, + mon, am_noproc); } /* Origin wants to monitor port Prt. State contains possible error, which has @@ -2713,9 +2729,10 @@ static void port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon) { ASSERT(prt); - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_monitor_failure(prt->common.id, mon); - else { + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_monitor_failure(prt, prt->common.id, mon); + } else { ASSERT(erts_monitor_is_target(mon)); erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon); } @@ -2725,11 +2742,12 @@ static int port_sig_monitor(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) + if (op == ERTS_PROC2PORT_SIG_EXEC) { port_monitor(prt, state, sigdp->u.monitor.mon); - else - port_monitor_failure(sigdp->u.monitor.port_id, - sigdp->u.monitor.mon); + } else { + port_monitor_failure(prt, sigdp->u.monitor.port_id, sigdp->u.monitor.mon); + } + return ERTS_PORT_REDS_MONITOR; } @@ -2858,7 +2876,7 @@ unlink_proc(Port *prt, Eterm pid) ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), pid); if (ilnk && !ilnk->unlinking) { - Uint64 id = erts_proc_sig_send_unlink(NULL, + Uint64 id = erts_proc_sig_send_unlink(&prt->common, prt->common.id, &ilnk->link); if (id != 0) @@ -3767,14 +3785,16 @@ erts_terminate_port(Port *pp) } typedef struct { - Eterm port_id; + Port *port; Eterm reason; } ErtsPortExitContext; static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - erts_proc_sig_send_link_exit(NULL, pectxt->port_id, + Port *port = pectxt->port; + + erts_proc_sig_send_link_exit(&port->common, port->common.id, lnk, pectxt->reason, NIL); return 1; } @@ -3782,10 +3802,15 @@ static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) static int monitor_port_exit(ErtsMonitor *mon, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - if (erts_monitor_is_target(mon)) - erts_proc_sig_send_monitor_down(mon, pectxt->reason); - else - erts_proc_sig_send_demonitor(mon); + Port *port = pectxt->port; + + if (erts_monitor_is_target(mon)) { + erts_proc_sig_send_monitor_down(&port->common, port->common.id, mon, + pectxt->reason); + } else { + erts_proc_sig_send_demonitor(&port->common, port->common.id, 0, mon); + } + return 1; } @@ -3866,8 +3891,8 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, if (prt->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name); - pectxt.port_id = prt->common.id; pectxt.reason = modified_reason; + pectxt.port = prt; if (state & ERTS_PORT_SFLG_DISTRIBUTION) { DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); @@ -6952,7 +6977,8 @@ static int do_driver_monitor_process(Port *prt, prt->common.id, process, NIL, THE_NON_VALUE); - if (!erts_proc_sig_send_monitor(&mdp->u.target, process)) { + if (!erts_proc_sig_send_monitor(&prt->common, prt->common.id, + &mdp->u.target, process)) { erts_monitor_release_both(mdp); return 1; } @@ -7001,7 +7027,8 @@ static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor) return 1; erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&prt->common, prt->common.id, 0, mon); + return 0; } diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl index e746a13d37..22b0e1dee8 100644 --- a/erts/emulator/test/signal_SUITE.erl +++ b/erts/emulator/test/signal_SUITE.erl @@ -41,7 +41,8 @@ busy_dist_demonitor_signal/1, busy_dist_down_signal/1, busy_dist_spawn_reply_signal/1, - busy_dist_unlink_ack_signal/1]). + busy_dist_unlink_ack_signal/1, + monitor_order/1]). init_per_testcase(Func, Config) when is_atom(Func), is_list(Config) -> [{testcase, Func}|Config]. @@ -67,7 +68,8 @@ all() -> busy_dist_demonitor_signal, busy_dist_down_signal, busy_dist_spawn_reply_signal, - busy_dist_unlink_ack_signal]. + busy_dist_unlink_ack_signal, + monitor_order]. %% Test that exit signals and messages are received in correct order xm_sig_order(Config) when is_list(Config) -> @@ -219,17 +221,24 @@ contended_signal_handling_make_ports(Drv, N, Ports) -> contended_signal_handling_make_ports(Drv, N-1, [Port|Ports]). busy_dist_exit_signal(Config) when is_list(Config) -> + ct:timetrap({seconds, 10}), + BusyTime = 1000, {ok, BusyChannelPeer, BusyChannelNode} = ?CT_PEER(), {ok, OtherPeer, OtherNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), Tester = self(), - Exiter = spawn(BusyChannelNode, - fun () -> - pong = net_adm:ping(OtherNode), - Tester ! {self(), alive}, - receive after infinity -> ok end - end), - receive {Exiter, alive} -> ok end, + {Exiter,MRef} = spawn_monitor(BusyChannelNode, + fun () -> + pong = net_adm:ping(OtherNode), + Tester ! {self(), alive}, + receive after infinity -> ok end + end), + receive + {Exiter, alive} -> + erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, Why, normal} -> + ct:fail({exiter_died, Why}) + end, Linker = spawn_link(OtherNode, fun () -> process_flag(trap_exit, true), @@ -242,7 +251,7 @@ busy_dist_exit_signal(Config) when is_list(Config) -> exit({unexpected_message, Unexpected}) end end), - make_busy(BusyChannelNode, OtherNode, 1000), + make_busy(BusyChannelNode, OtherNode, BusyTime), exit(Exiter, tester_killed_me), receive {Linker, got_exiter_exit_message} -> @@ -257,6 +266,8 @@ busy_dist_exit_signal(Config) when is_list(Config) -> ok. busy_dist_demonitor_signal(Config) when is_list(Config) -> + ct:timetrap({seconds, 10}), + BusyTime = 1000, {ok, BusyChannelPeer, BusyChannelNode} = ?CT_PEER(), {ok, OtherPeer, OtherNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), @@ -291,7 +302,7 @@ busy_dist_demonitor_signal(Config) when is_list(Config) -> end), Demonitorer ! {self(), monitor, Demonitoree}, receive {Demonitoree, monitored} -> ok end, - make_busy(BusyChannelNode, OtherNode, 1000), + make_busy(BusyChannelNode, OtherNode, BusyTime), exit(Demonitorer, tester_killed_me), receive {Demonitoree, got_demonitorer_demonitor_signal} -> @@ -306,17 +317,24 @@ busy_dist_demonitor_signal(Config) when is_list(Config) -> ok. busy_dist_down_signal(Config) when is_list(Config) -> + ct:timetrap({seconds, 10}), + BusyTime = 1000, {ok, BusyChannelPeer, BusyChannelNode} = ?CT_PEER(), {ok, OtherPeer, OtherNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), Tester = self(), - Exiter = spawn(BusyChannelNode, - fun () -> - pong = net_adm:ping(OtherNode), - Tester ! {self(), alive}, - receive after infinity -> ok end - end), - receive {Exiter, alive} -> ok end, + {Exiter,MRef} = spawn_monitor(BusyChannelNode, + fun () -> + pong = net_adm:ping(OtherNode), + Tester ! {self(), alive}, + receive after infinity -> ok end + end), + receive + {Exiter, alive} -> + erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, Why, normal} -> + ct:fail({exiter_died, Why}) + end, Monitorer = spawn_link(OtherNode, fun () -> process_flag(trap_exit, true), @@ -329,7 +347,7 @@ busy_dist_down_signal(Config) when is_list(Config) -> exit({unexpected_message, Unexpected}) end end), - make_busy(BusyChannelNode, OtherNode, 1000), + make_busy(BusyChannelNode, OtherNode, BusyTime), exit(Exiter, tester_killed_me), receive {Monitorer, got_exiter_down_message} -> @@ -344,6 +362,8 @@ busy_dist_down_signal(Config) when is_list(Config) -> ok. busy_dist_spawn_reply_signal(Config) when is_list(Config) -> + ct:timetrap({seconds, 10}), + BusyTime = 1000, {ok, BusyChannelPeer, BusyChannelNode} = ?CT_PEER(), {ok, OtherPeer, OtherNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), @@ -365,7 +385,7 @@ busy_dist_spawn_reply_signal(Config) when is_list(Config) -> end end), receive {Spawner, ready} -> ok end, - make_busy(BusyChannelNode, OtherNode, 1000), + make_busy(BusyChannelNode, OtherNode, BusyTime), Spawner ! {self(), go}, receive {Spawner, got_spawn_reply_message} -> @@ -385,17 +405,24 @@ busy_dist_spawn_reply_signal(Config) when is_list(Config) -> id}). busy_dist_unlink_ack_signal(Config) when is_list(Config) -> + ct:timetrap({seconds, 10}), + BusyTime = 1000, {ok, BusyChannelPeer, BusyChannelNode} = ?CT_PEER(), {ok, OtherPeer, OtherNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]), Tester = self(), - Unlinkee = spawn(BusyChannelNode, - fun () -> - pong = net_adm:ping(OtherNode), - Tester ! {self(), alive}, - receive after infinity -> ok end - end), - receive {Unlinkee, alive} -> ok end, + {Unlinkee,MRef} = spawn_monitor(BusyChannelNode, + fun () -> + pong = net_adm:ping(OtherNode), + Tester ! {self(), alive}, + receive after infinity -> ok end + end), + receive + {Unlinkee, alive} -> + erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, Why, normal} -> + ct:fail({unlinkee_died, Why}) + end, Unlinker = spawn_link(OtherNode, fun () -> erts_debug:set_internal_state(available_internal_state, true), @@ -418,7 +445,7 @@ busy_dist_unlink_ack_signal(Config) when is_list(Config) -> Tester ! {self(), got_unlink_ack_signal} end), receive {Unlinker, ready} -> ok end, - make_busy(BusyChannelNode, OtherNode, 1000), + make_busy(BusyChannelNode, OtherNode, BusyTime), Unlinker ! {self(), go}, receive {Unlinker, got_unlink_ack_signal} -> @@ -432,6 +459,37 @@ busy_dist_unlink_ack_signal(Config) when is_list(Config) -> peer:stop(OtherPeer), ok. +%% Monitors could be reordered relative to message signals when the parallel +%% signal sending optimization was active. +monitor_order(_Config) -> + process_flag(message_queue_data, off_heap), + monitor_order_1(10). + +monitor_order_1(0) -> + ok; +monitor_order_1(N) -> + Self = self(), + {Pid, MRef} = spawn_monitor(fun() -> + receive + MRef -> + %% The first message sets up + %% the parallel signal buffer, + %% the second uses it. + Self ! {self(), MRef, first}, + Self ! {self(), MRef, second} + end, + exit(normal) + end), + Pid ! MRef, + receive + {'DOWN', MRef, process, _, normal} -> + ct:fail("Down signal arrived before second message!"); + {Pid, MRef, second} -> + receive {Pid, MRef, first} -> ok end, + erlang:demonitor(MRef, [flush]), + monitor_order_1(N - 1) + end. + %% %% -- Internal utils -------------------------------------------------------- %% -- cgit v1.2.1