summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
authorJohn Högberg <john@erlang.org>2022-02-23 15:00:22 +0100
committerJohn Högberg <john@erlang.org>2022-03-08 21:46:19 +0100
commit49681bd7458649b34567bdad6918bf579846d80e (patch)
treea3e9e0edd888ce1b11e2b6a0d2bf95a3651b05be /erts/emulator/beam/io.c
parent8511ce01c776ea9d50467cd83ef20d4cf5c646ea (diff)
downloaderlang-49681bd7458649b34567bdad6918bf579846d80e.tar.gz
erts: Fix parallel signal optimization
When the parallel signal optimization was enabled and signals were sent without a known sender (e.g. a monitor-by-name firing), they always landed in signal slot 0, which was rarely the same slot as messages sent from the same process, breaking our signal order guarantees. This commit fixes that by always keeping track of the sender regardless of how and where signal are sent.
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c119
1 files changed, 73 insertions, 46 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index e1a58a1262..8014e7bd08 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -819,7 +819,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
port_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, pid);
erts_link_tree_insert(&ERTS_P_LINKS(port), port_lnk);
proc_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, port->common.id);
- if (!erts_proc_sig_send_link(NULL, pid, proc_lnk)) {
+ if (!erts_proc_sig_send_link(&port->common, port->common.id,
+ pid, proc_lnk)) {
erts_link_tree_delete(&ERTS_P_LINKS(port), port_lnk);
erts_link_internal_release(proc_lnk);
erts_link_internal_release(port_lnk);
@@ -1230,7 +1231,7 @@ send_badsig(Port *prt) {
ERTS_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_get_scheduler_id());
ASSERT(is_internal_pid(connected));
- erts_proc_sig_send_exit(NULL, prt->common.id, connected,
+ erts_proc_sig_send_exit(&prt->common, prt->common.id, connected,
am_badsig, NIL, 0);
} /* send_badsig */
@@ -2350,7 +2351,8 @@ set_port_connected(int bang_op,
if (created) {
ErtsLink *olnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT,
prt->common.id);
- if (!erts_proc_sig_send_link(NULL, connect, olnk)) {
+ if (!erts_proc_sig_send_link(&prt->common, prt->common.id,
+ connect, olnk)) {
erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk);
erts_link_internal_release(lnk);
erts_link_internal_release(olnk);
@@ -2471,39 +2473,47 @@ erts_port_connect(Process *c_p,
}
static void
-port_unlink_failure(Eterm port_id, ErtsSigUnlinkOp *sulnk)
+port_unlink_failure(Port *prt, Eterm port_id, ErtsSigUnlinkOp *sulnk)
{
- erts_proc_sig_send_unlink_ack(NULL, port_id, sulnk);
+ erts_proc_sig_send_unlink_ack(prt ? &prt->common : NULL, port_id, sulnk);
}
static void
port_unlink(Port *prt, erts_aint32_t state, ErtsSigUnlinkOp *sulnk)
{
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_unlink_failure(prt->common.id, sulnk);
- else {
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_unlink_failure(prt, prt->common.id, sulnk);
+ } else {
ErtsILink *ilnk;
+
ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt),
sulnk->from);
+
if (ilnk && !ilnk->unlinking) {
if (IS_TRACED_FL(prt, F_TRACE_PORTS))
trace_port(prt, am_getting_unlinked, sulnk->from);
erts_link_tree_delete(&ERTS_P_LINKS(prt), &ilnk->link);
erts_link_internal_release(&ilnk->link);
}
- erts_proc_sig_send_unlink_ack(NULL, prt->common.id, sulnk);
+
+ erts_proc_sig_send_unlink_ack(&prt->common, prt->common.id, sulnk);
}
}
static int
port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_unlink(prt, state, sigdp->u.unlink.sulnk);
- else
- port_unlink_failure(sigdp->u.unlink.port_id, sigdp->u.unlink.sulnk);
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_unlink(prt, state, sigdp->u.unlink.sulnk);
+ } else {
+ port_unlink_failure(prt, sigdp->u.unlink.port_id,
+ sigdp->u.unlink.sulnk);
+ }
+
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+
return ERTS_PORT_REDS_UNLINK;
}
@@ -2528,7 +2538,7 @@ erts_port_unlink(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk, Eterm *refp)
BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK);
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- port_unlink_failure(prt->common.id, sulnk);
+ port_unlink_failure(prt, prt->common.id, sulnk);
return ERTS_PORT_OP_DROPPED;
default:
/* Schedule call instead... */
@@ -2628,17 +2638,17 @@ erts_port_unlink_ack(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk)
}
static void
-port_link_failure(Eterm port_id, ErtsLink *lnk)
+port_link_failure(Port *port, Eterm port_id, ErtsLink *lnk)
{
- erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL);
+ erts_proc_sig_send_link_exit(&port->common, port_id, lnk, am_noproc, NIL);
}
static void
port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk)
{
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_link_failure(prt->common.id, nlnk);
- else {
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_link_failure(prt, prt->common.id, nlnk);
+ } else {
ErtsLink *lnk;
lnk = erts_link_tree_lookup_insert(&ERTS_P_LINKS(prt), nlnk);
if (lnk)
@@ -2651,12 +2661,16 @@ port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk)
static int
port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_link(prt, state, sigdp->u.link.lnk);
- else
- port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk);
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_link(prt, state, sigdp->u.link.lnk);
+ } else {
+ port_link_failure(prt, sigdp->u.link.port_id, sigdp->u.link.lnk);
+ }
+
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+
return ERTS_PORT_REDS_LINK;
}
@@ -2701,9 +2715,11 @@ erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp)
}
static void
-port_monitor_failure(Eterm port_id, ErtsMonitor *mon)
+port_monitor_failure(Port *prt, Eterm port_id, ErtsMonitor *mon)
{
- erts_proc_sig_send_monitor_down(mon, am_noproc);
+ ASSERT(prt == NULL || prt->common.id == port_id);
+ erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, port_id,
+ mon, am_noproc);
}
/* Origin wants to monitor port Prt. State contains possible error, which has
@@ -2713,9 +2729,10 @@ static void
port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon)
{
ASSERT(prt);
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_monitor_failure(prt->common.id, mon);
- else {
+
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_monitor_failure(prt, prt->common.id, mon);
+ } else {
ASSERT(erts_monitor_is_target(mon));
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon);
}
@@ -2725,11 +2742,12 @@ static int
port_sig_monitor(Port *prt, erts_aint32_t state, int op,
ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
port_monitor(prt, state, sigdp->u.monitor.mon);
- else
- port_monitor_failure(sigdp->u.monitor.port_id,
- sigdp->u.monitor.mon);
+ } else {
+ port_monitor_failure(prt, sigdp->u.monitor.port_id, sigdp->u.monitor.mon);
+ }
+
return ERTS_PORT_REDS_MONITOR;
}
@@ -2858,7 +2876,7 @@ unlink_proc(Port *prt, Eterm pid)
ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt),
pid);
if (ilnk && !ilnk->unlinking) {
- Uint64 id = erts_proc_sig_send_unlink(NULL,
+ Uint64 id = erts_proc_sig_send_unlink(&prt->common,
prt->common.id,
&ilnk->link);
if (id != 0)
@@ -3767,14 +3785,16 @@ erts_terminate_port(Port *pp)
}
typedef struct {
- Eterm port_id;
+ Port *port;
Eterm reason;
} ErtsPortExitContext;
static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds)
{
ErtsPortExitContext *pectxt = vpectxt;
- erts_proc_sig_send_link_exit(NULL, pectxt->port_id,
+ Port *port = pectxt->port;
+
+ erts_proc_sig_send_link_exit(&port->common, port->common.id,
lnk, pectxt->reason, NIL);
return 1;
}
@@ -3782,10 +3802,15 @@ static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds)
static int monitor_port_exit(ErtsMonitor *mon, void *vpectxt, Sint reds)
{
ErtsPortExitContext *pectxt = vpectxt;
- if (erts_monitor_is_target(mon))
- erts_proc_sig_send_monitor_down(mon, pectxt->reason);
- else
- erts_proc_sig_send_demonitor(mon);
+ Port *port = pectxt->port;
+
+ if (erts_monitor_is_target(mon)) {
+ erts_proc_sig_send_monitor_down(&port->common, port->common.id, mon,
+ pectxt->reason);
+ } else {
+ erts_proc_sig_send_demonitor(&port->common, port->common.id, 0, mon);
+ }
+
return 1;
}
@@ -3866,8 +3891,8 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
if (prt->common.u.alive.reg != NULL)
(void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name);
- pectxt.port_id = prt->common.id;
pectxt.reason = modified_reason;
+ pectxt.port = prt;
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
@@ -6952,7 +6977,8 @@ static int do_driver_monitor_process(Port *prt,
prt->common.id, process, NIL,
THE_NON_VALUE);
- if (!erts_proc_sig_send_monitor(&mdp->u.target, process)) {
+ if (!erts_proc_sig_send_monitor(&prt->common, prt->common.id,
+ &mdp->u.target, process)) {
erts_monitor_release_both(mdp);
return 1;
}
@@ -7001,7 +7027,8 @@ static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor)
return 1;
erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon);
- erts_proc_sig_send_demonitor(mon);
+ erts_proc_sig_send_demonitor(&prt->common, prt->common.id, 0, mon);
+
return 0;
}