diff options
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r-- | erts/emulator/beam/io.c | 119 |
1 files changed, 73 insertions, 46 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index e1a58a1262..8014e7bd08 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -819,7 +819,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */ port_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, pid); erts_link_tree_insert(&ERTS_P_LINKS(port), port_lnk); proc_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, port->common.id); - if (!erts_proc_sig_send_link(NULL, pid, proc_lnk)) { + if (!erts_proc_sig_send_link(&port->common, port->common.id, + pid, proc_lnk)) { erts_link_tree_delete(&ERTS_P_LINKS(port), port_lnk); erts_link_internal_release(proc_lnk); erts_link_internal_release(port_lnk); @@ -1230,7 +1231,7 @@ send_badsig(Port *prt) { ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); ASSERT(is_internal_pid(connected)); - erts_proc_sig_send_exit(NULL, prt->common.id, connected, + erts_proc_sig_send_exit(&prt->common, prt->common.id, connected, am_badsig, NIL, 0); } /* send_badsig */ @@ -2350,7 +2351,8 @@ set_port_connected(int bang_op, if (created) { ErtsLink *olnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, prt->common.id); - if (!erts_proc_sig_send_link(NULL, connect, olnk)) { + if (!erts_proc_sig_send_link(&prt->common, prt->common.id, + connect, olnk)) { erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk); erts_link_internal_release(lnk); erts_link_internal_release(olnk); @@ -2471,39 +2473,47 @@ erts_port_connect(Process *c_p, } static void -port_unlink_failure(Eterm port_id, ErtsSigUnlinkOp *sulnk) +port_unlink_failure(Port *prt, Eterm port_id, ErtsSigUnlinkOp *sulnk) { - erts_proc_sig_send_unlink_ack(NULL, port_id, sulnk); + erts_proc_sig_send_unlink_ack(prt ? &prt->common : NULL, port_id, sulnk); } static void port_unlink(Port *prt, erts_aint32_t state, ErtsSigUnlinkOp *sulnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_unlink_failure(prt->common.id, sulnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_unlink_failure(prt, prt->common.id, sulnk); + } else { ErtsILink *ilnk; + ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), sulnk->from); + if (ilnk && !ilnk->unlinking) { if (IS_TRACED_FL(prt, F_TRACE_PORTS)) trace_port(prt, am_getting_unlinked, sulnk->from); erts_link_tree_delete(&ERTS_P_LINKS(prt), &ilnk->link); erts_link_internal_release(&ilnk->link); } - erts_proc_sig_send_unlink_ack(NULL, prt->common.id, sulnk); + + erts_proc_sig_send_unlink_ack(&prt->common, prt->common.id, sulnk); } } static int port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_unlink(prt, state, sigdp->u.unlink.sulnk); - else - port_unlink_failure(sigdp->u.unlink.port_id, sigdp->u.unlink.sulnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_unlink(prt, state, sigdp->u.unlink.sulnk); + } else { + port_unlink_failure(prt, sigdp->u.unlink.port_id, + sigdp->u.unlink.sulnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_UNLINK; } @@ -2528,7 +2538,7 @@ erts_port_unlink(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk, Eterm *refp) BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK); return ERTS_PORT_OP_DONE; case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - port_unlink_failure(prt->common.id, sulnk); + port_unlink_failure(prt, prt->common.id, sulnk); return ERTS_PORT_OP_DROPPED; default: /* Schedule call instead... */ @@ -2628,17 +2638,17 @@ erts_port_unlink_ack(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk) } static void -port_link_failure(Eterm port_id, ErtsLink *lnk) +port_link_failure(Port *port, Eterm port_id, ErtsLink *lnk) { - erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL); + erts_proc_sig_send_link_exit(&port->common, port_id, lnk, am_noproc, NIL); } static void port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) { - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_link_failure(prt->common.id, nlnk); - else { + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_link_failure(prt, prt->common.id, nlnk); + } else { ErtsLink *lnk; lnk = erts_link_tree_lookup_insert(&ERTS_P_LINKS(prt), nlnk); if (lnk) @@ -2651,12 +2661,16 @@ port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk) static int port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) - port_link(prt, state, sigdp->u.link.lnk); - else - port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk); - if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) - port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + if (op == ERTS_PROC2PORT_SIG_EXEC) { + port_link(prt, state, sigdp->u.link.lnk); + } else { + port_link_failure(prt, sigdp->u.link.port_id, sigdp->u.link.lnk); + } + + if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) { + port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt); + } + return ERTS_PORT_REDS_LINK; } @@ -2701,9 +2715,11 @@ erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp) } static void -port_monitor_failure(Eterm port_id, ErtsMonitor *mon) +port_monitor_failure(Port *prt, Eterm port_id, ErtsMonitor *mon) { - erts_proc_sig_send_monitor_down(mon, am_noproc); + ASSERT(prt == NULL || prt->common.id == port_id); + erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, port_id, + mon, am_noproc); } /* Origin wants to monitor port Prt. State contains possible error, which has @@ -2713,9 +2729,10 @@ static void port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon) { ASSERT(prt); - if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) - port_monitor_failure(prt->common.id, mon); - else { + + if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) { + port_monitor_failure(prt, prt->common.id, mon); + } else { ASSERT(erts_monitor_is_target(mon)); erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon); } @@ -2725,11 +2742,12 @@ static int port_sig_monitor(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) { - if (op == ERTS_PROC2PORT_SIG_EXEC) + if (op == ERTS_PROC2PORT_SIG_EXEC) { port_monitor(prt, state, sigdp->u.monitor.mon); - else - port_monitor_failure(sigdp->u.monitor.port_id, - sigdp->u.monitor.mon); + } else { + port_monitor_failure(prt, sigdp->u.monitor.port_id, sigdp->u.monitor.mon); + } + return ERTS_PORT_REDS_MONITOR; } @@ -2858,7 +2876,7 @@ unlink_proc(Port *prt, Eterm pid) ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt), pid); if (ilnk && !ilnk->unlinking) { - Uint64 id = erts_proc_sig_send_unlink(NULL, + Uint64 id = erts_proc_sig_send_unlink(&prt->common, prt->common.id, &ilnk->link); if (id != 0) @@ -3767,14 +3785,16 @@ erts_terminate_port(Port *pp) } typedef struct { - Eterm port_id; + Port *port; Eterm reason; } ErtsPortExitContext; static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - erts_proc_sig_send_link_exit(NULL, pectxt->port_id, + Port *port = pectxt->port; + + erts_proc_sig_send_link_exit(&port->common, port->common.id, lnk, pectxt->reason, NIL); return 1; } @@ -3782,10 +3802,15 @@ static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds) static int monitor_port_exit(ErtsMonitor *mon, void *vpectxt, Sint reds) { ErtsPortExitContext *pectxt = vpectxt; - if (erts_monitor_is_target(mon)) - erts_proc_sig_send_monitor_down(mon, pectxt->reason); - else - erts_proc_sig_send_demonitor(mon); + Port *port = pectxt->port; + + if (erts_monitor_is_target(mon)) { + erts_proc_sig_send_monitor_down(&port->common, port->common.id, mon, + pectxt->reason); + } else { + erts_proc_sig_send_demonitor(&port->common, port->common.id, 0, mon); + } + return 1; } @@ -3866,8 +3891,8 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, if (prt->common.u.alive.reg != NULL) (void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name); - pectxt.port_id = prt->common.id; pectxt.reason = modified_reason; + pectxt.port = prt; if (state & ERTS_PORT_SFLG_DISTRIBUTION) { DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); @@ -6952,7 +6977,8 @@ static int do_driver_monitor_process(Port *prt, prt->common.id, process, NIL, THE_NON_VALUE); - if (!erts_proc_sig_send_monitor(&mdp->u.target, process)) { + if (!erts_proc_sig_send_monitor(&prt->common, prt->common.id, + &mdp->u.target, process)) { erts_monitor_release_both(mdp); return 1; } @@ -7001,7 +7027,8 @@ static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor) return 1; erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon); - erts_proc_sig_send_demonitor(mon); + erts_proc_sig_send_demonitor(&prt->common, prt->common.id, 0, mon); + return 0; } |