diff options
author | John Högberg <john@erlang.org> | 2022-02-23 15:00:22 +0100 |
---|---|---|
committer | John Högberg <john@erlang.org> | 2022-03-08 21:46:19 +0100 |
commit | 49681bd7458649b34567bdad6918bf579846d80e (patch) | |
tree | a3e9e0edd888ce1b11e2b6a0d2bf95a3651b05be /erts/emulator/beam/io.c | |
parent | 8511ce01c776ea9d50467cd83ef20d4cf5c646ea (diff) | |
download | erlang-49681bd7458649b34567bdad6918bf579846d80e.tar.gz |
erts: Fix parallel signal optimization
When the parallel signal optimization was enabled and signals were
sent without a known sender (e.g. a monitor-by-name firing), they
always landed in signal slot 0, which was rarely the same slot as
messages sent from the same process, breaking our signal order
guarantees.
This commit fixes that by always keeping track of the sender
regardless of how and where signal are sent.
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 119 |
1 files changed, 73 insertions, 46 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index e1a58a1262..8014e7bd08 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -819,7 +819,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ port_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, pid); erts_link_tree_insert(&ERTS_P_LINKS(port), port_lnk); proc_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, port->common.id); - if (!erts_proc_sig_send_link(NULL, pid, proc_lnk)) { + if (!erts_proc_sig_send_link(&port->common, port->common.id, + pid, proc_lnk)) { erts_link_tree_delete(&ERTS_P_LINKS(port), port_lnk); erts_link_internal_release(proc_lnk); erts_link_internal_release(port_lnk); @@ -1230,7 +1231,7 @@ send_badsig(Port *prt) { ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); ASSERT(is_internal_pid(connected)); - erts_proc_sig_send_exit(NULL, prt->common.id, connected, + erts_proc_sig_send_exit(&prt->common, prt->common.id, connected, am_badsig, NIL, 0); } /* send_badsig */ @@ -2350,7 +2351,8 @@ set_port_connected(int bang_op, if (created) { ErtsLink *olnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, prt->common.id); - if (!erts_proc_sig_send_link(NULL, connect, olnk)) { + if (!erts_proc_sig_send_link(&prt->common, prt->common.id, + connect, olnk)) { erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); erts_link_internal_release(lnk); erts_link_internal_release(olnk); @@ -2471,39 +2473,47 @@ erts_port_connect(Process *c_p, } static void -port_unlink_failure(Eterm port_id, ErtsSigUnlinkOp *sulnk) +port_unlink_failure(Port *prt, Eterm port_id, ErtsSigUnlinkOp *sulnk) { - erts_proc_sig_send_unlink_ack(NULL, port_id, sulnk); + erts_proc_sig_send_unlink_ack(prt ? &prt->common : NULL, port_id, sulnk); } static void port_unlink(Port *prt, erts_aint32_t state, ErtsSigUnlinkOp *sulnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_unlink_failure(prt->common.id, sulnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_unlink_failure(prt, prt->common.id, sulnk); + } else { ErtsILink *ilnk; + ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), sulnk->from); + if (ilnk && !ilnk->unlinking) { if (IS_TRACED_FL(prt, F_TRACE_PORTS)) trace_port(prt, am_getting_unlinked, sulnk->from); erts_link_tree_delete(&ERTS_P_LINKS(prt), &ilnk->link); erts_link_internal_release(&ilnk->link); } - erts_proc_sig_send_unlink_ack(NULL, prt->common.id, sulnk); + + erts_proc_sig_send_unlink_ack(&prt->common, prt->common.id, sulnk); } } static int port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_unlink(prt, state, sigdp->u.unlink.sulnk); - else - port_unlink_failure(sigdp->u.unlink.port_id, sigdp->u.unlink.sulnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_unlink(prt, state, sigdp->u.unlink.sulnk); + } else { + port_unlink_failure(prt, sigdp->u.unlink.port_id, + sigdp->u.unlink.sulnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_UNLINK; } @@ -2528,7 +2538,7 @@ erts_port_unlink(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk, Eterm *refp) BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK); return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - port_unlink_failure(prt->common.id, sulnk); + port_unlink_failure(prt, prt->common.id, sulnk); return ERTS_PORT_OP_DROPPED; default: /* Schedule call instead... */ @@ -2628,17 +2638,17 @@ erts_port_unlink_ack(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk) } static void -port_link_failure(Eterm port_id, ErtsLink *lnk) +port_link_failure(Port *port, Eterm port_id, ErtsLink *lnk) { - erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL); + erts_proc_sig_send_link_exit(&port->common, port_id, lnk, am_noproc, NIL); } static void port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_link_failure(prt->common.id, nlnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_link_failure(prt, prt->common.id, nlnk); + } else { ErtsLink *lnk; lnk = erts_link_tree_lookup_insert(&ERTS_P_LINKS(prt), nlnk); if (lnk) @@ -2651,12 +2661,16 @@ port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) static int port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_link(prt, state, sigdp->u.link.lnk); - else - port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_link(prt, state, sigdp->u.link.lnk); + } else { + port_link_failure(prt, sigdp->u.link.port_id, sigdp->u.link.lnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_LINK; } @@ -2701,9 +2715,11 @@ erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp) } static void -port_monitor_failure(Eterm port_id, ErtsMonitor *mon) +port_monitor_failure(Port *prt, Eterm port_id, ErtsMonitor *mon) { - erts_proc_sig_send_monitor_down(mon, am_noproc); + ASSERT(prt == NULL || prt->common.id == port_id); + erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, port_id, + mon, am_noproc); } /* Origin wants to monitor port Prt. State contains possible error, which has @@ -2713,9 +2729,10 @@ static void port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon) { ASSERT(prt); - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_monitor_failure(prt->common.id, mon); - else { + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_monitor_failure(prt, prt->common.id, mon); + } else { ASSERT(erts_monitor_is_target(mon)); erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon); } @@ -2725,11 +2742,12 @@ static int port_sig_monitor(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) + if (op == ERTS_PROC2PORT_SIG_EXEC) { port_monitor(prt, state, sigdp->u.monitor.mon); - else - port_monitor_failure(sigdp->u.monitor.port_id, - sigdp->u.monitor.mon); + } else { + port_monitor_failure(prt, sigdp->u.monitor.port_id, sigdp->u.monitor.mon); + } + return ERTS_PORT_REDS_MONITOR; } @@ -2858,7 +2876,7 @@ unlink_proc(Port *prt, Eterm pid) ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), pid); if (ilnk && !ilnk->unlinking) { - Uint64 id = erts_proc_sig_send_unlink(NULL, + Uint64 id = erts_proc_sig_send_unlink(&prt->common, prt->common.id, &ilnk->link); if (id != 0) @@ -3767,14 +3785,16 @@ erts_terminate_port(Port *pp) } typedef struct { - Eterm port_id; + Port *port; Eterm reason; } ErtsPortExitContext; static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - erts_proc_sig_send_link_exit(NULL, pectxt->port_id, + Port *port = pectxt->port; + + erts_proc_sig_send_link_exit(&port->common, port->common.id, lnk, pectxt->reason, NIL); return 1; } @@ -3782,10 +3802,15 @@ static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) static int monitor_port_exit(ErtsMonitor *mon, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - if (erts_monitor_is_target(mon)) - erts_proc_sig_send_monitor_down(mon, pectxt->reason); - else - erts_proc_sig_send_demonitor(mon); + Port *port = pectxt->port; + + if (erts_monitor_is_target(mon)) { + erts_proc_sig_send_monitor_down(&port->common, port->common.id, mon, + pectxt->reason); + } else { + erts_proc_sig_send_demonitor(&port->common, port->common.id, 0, mon); + } + return 1; } @@ -3866,8 +3891,8 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, if (prt->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name); - pectxt.port_id = prt->common.id; pectxt.reason = modified_reason; + pectxt.port = prt; if (state & ERTS_PORT_SFLG_DISTRIBUTION) { DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); @@ -6952,7 +6977,8 @@ static int do_driver_monitor_process(Port *prt, prt->common.id, process, NIL, THE_NON_VALUE); - if (!erts_proc_sig_send_monitor(&mdp->u.target, process)) { + if (!erts_proc_sig_send_monitor(&prt->common, prt->common.id, + &mdp->u.target, process)) { erts_monitor_release_both(mdp); return 1; } @@ -7001,7 +7027,8 @@ static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor) return 1; erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&prt->common, prt->common.id, 0, mon); + return 0; } |