summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/io.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/io.c')
-rw-r--r--erts/emulator/beam/io.c119
1 files changed, 73 insertions, 46 deletions
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index e1a58a1262..8014e7bd08 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -819,7 +819,8 @@ driver_create_port(ErlDrvPort creator_port_ix, /* Creating port */
port_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, pid);
erts_link_tree_insert(&ERTS_P_LINKS(port), port_lnk);
proc_lnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT, port->common.id);
- if (!erts_proc_sig_send_link(NULL, pid, proc_lnk)) {
+ if (!erts_proc_sig_send_link(&port->common, port->common.id,
+ pid, proc_lnk)) {
erts_link_tree_delete(&ERTS_P_LINKS(port), port_lnk);
erts_link_internal_release(proc_lnk);
erts_link_internal_release(port_lnk);
@@ -1230,7 +1231,7 @@ send_badsig(Port *prt) {
ERTS_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_get_scheduler_id());
ASSERT(is_internal_pid(connected));
- erts_proc_sig_send_exit(NULL, prt->common.id, connected,
+ erts_proc_sig_send_exit(&prt->common, prt->common.id, connected,
am_badsig, NIL, 0);
} /* send_badsig */
@@ -2350,7 +2351,8 @@ set_port_connected(int bang_op,
if (created) {
ErtsLink *olnk = erts_link_internal_create(ERTS_LNK_TYPE_PORT,
prt->common.id);
- if (!erts_proc_sig_send_link(NULL, connect, olnk)) {
+ if (!erts_proc_sig_send_link(&prt->common, prt->common.id,
+ connect, olnk)) {
erts_link_tree_delete(&ERTS_P_LINKS(prt), lnk);
erts_link_internal_release(lnk);
erts_link_internal_release(olnk);
@@ -2471,39 +2473,47 @@ erts_port_connect(Process *c_p,
}
static void
-port_unlink_failure(Eterm port_id, ErtsSigUnlinkOp *sulnk)
+port_unlink_failure(Port *prt, Eterm port_id, ErtsSigUnlinkOp *sulnk)
{
- erts_proc_sig_send_unlink_ack(NULL, port_id, sulnk);
+ erts_proc_sig_send_unlink_ack(prt ? &prt->common : NULL, port_id, sulnk);
}
static void
port_unlink(Port *prt, erts_aint32_t state, ErtsSigUnlinkOp *sulnk)
{
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_unlink_failure(prt->common.id, sulnk);
- else {
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_unlink_failure(prt, prt->common.id, sulnk);
+ } else {
ErtsILink *ilnk;
+
ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt),
sulnk->from);
+
if (ilnk && !ilnk->unlinking) {
if (IS_TRACED_FL(prt, F_TRACE_PORTS))
trace_port(prt, am_getting_unlinked, sulnk->from);
erts_link_tree_delete(&ERTS_P_LINKS(prt), &ilnk->link);
erts_link_internal_release(&ilnk->link);
}
- erts_proc_sig_send_unlink_ack(NULL, prt->common.id, sulnk);
+
+ erts_proc_sig_send_unlink_ack(&prt->common, prt->common.id, sulnk);
}
}
static int
port_sig_unlink(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_unlink(prt, state, sigdp->u.unlink.sulnk);
- else
- port_unlink_failure(sigdp->u.unlink.port_id, sigdp->u.unlink.sulnk);
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_unlink(prt, state, sigdp->u.unlink.sulnk);
+ } else {
+ port_unlink_failure(prt, sigdp->u.unlink.port_id,
+ sigdp->u.unlink.sulnk);
+ }
+
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+
return ERTS_PORT_REDS_UNLINK;
}
@@ -2528,7 +2538,7 @@ erts_port_unlink(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk, Eterm *refp)
BUMP_REDS(c_p, ERTS_PORT_REDS_UNLINK);
return ERTS_PORT_OP_DONE;
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- port_unlink_failure(prt->common.id, sulnk);
+ port_unlink_failure(prt, prt->common.id, sulnk);
return ERTS_PORT_OP_DROPPED;
default:
/* Schedule call instead... */
@@ -2628,17 +2638,17 @@ erts_port_unlink_ack(Process *c_p, Port *prt, ErtsSigUnlinkOp *sulnk)
}
static void
-port_link_failure(Eterm port_id, ErtsLink *lnk)
+port_link_failure(Port *port, Eterm port_id, ErtsLink *lnk)
{
- erts_proc_sig_send_link_exit(NULL, port_id, lnk, am_noproc, NIL);
+ erts_proc_sig_send_link_exit(&port->common, port_id, lnk, am_noproc, NIL);
}
static void
port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk)
{
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_link_failure(prt->common.id, nlnk);
- else {
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_link_failure(prt, prt->common.id, nlnk);
+ } else {
ErtsLink *lnk;
lnk = erts_link_tree_lookup_insert(&ERTS_P_LINKS(prt), nlnk);
if (lnk)
@@ -2651,12 +2661,16 @@ port_link(Port *prt, erts_aint32_t state, ErtsLink *nlnk)
static int
port_sig_link(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
- port_link(prt, state, sigdp->u.link.lnk);
- else
- port_link_failure(sigdp->u.link.port_id, sigdp->u.link.lnk);
- if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
- port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
+ port_link(prt, state, sigdp->u.link.lnk);
+ } else {
+ port_link_failure(prt, sigdp->u.link.port_id, sigdp->u.link.lnk);
+ }
+
+ if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) {
+ port_sched_op_reply(sigdp->caller, sigdp->ref, am_true, prt);
+ }
+
return ERTS_PORT_REDS_LINK;
}
@@ -2701,9 +2715,11 @@ erts_port_link(Process *c_p, Port *prt, ErtsLink *lnk, Eterm *refp)
}
static void
-port_monitor_failure(Eterm port_id, ErtsMonitor *mon)
+port_monitor_failure(Port *prt, Eterm port_id, ErtsMonitor *mon)
{
- erts_proc_sig_send_monitor_down(mon, am_noproc);
+ ASSERT(prt == NULL || prt->common.id == port_id);
+ erts_proc_sig_send_monitor_down(prt ? &prt->common : NULL, port_id,
+ mon, am_noproc);
}
/* Origin wants to monitor port Prt. State contains possible error, which has
@@ -2713,9 +2729,10 @@ static void
port_monitor(Port *prt, erts_aint32_t state, ErtsMonitor *mon)
{
ASSERT(prt);
- if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
- port_monitor_failure(prt->common.id, mon);
- else {
+
+ if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP) {
+ port_monitor_failure(prt, prt->common.id, mon);
+ } else {
ASSERT(erts_monitor_is_target(mon));
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(prt), mon);
}
@@ -2725,11 +2742,12 @@ static int
port_sig_monitor(Port *prt, erts_aint32_t state, int op,
ErtsProc2PortSigData *sigdp)
{
- if (op == ERTS_PROC2PORT_SIG_EXEC)
+ if (op == ERTS_PROC2PORT_SIG_EXEC) {
port_monitor(prt, state, sigdp->u.monitor.mon);
- else
- port_monitor_failure(sigdp->u.monitor.port_id,
- sigdp->u.monitor.mon);
+ } else {
+ port_monitor_failure(prt, sigdp->u.monitor.port_id, sigdp->u.monitor.mon);
+ }
+
return ERTS_PORT_REDS_MONITOR;
}
@@ -2858,7 +2876,7 @@ unlink_proc(Port *prt, Eterm pid)
ilnk = (ErtsILink *) erts_link_tree_lookup(ERTS_P_LINKS(prt),
pid);
if (ilnk && !ilnk->unlinking) {
- Uint64 id = erts_proc_sig_send_unlink(NULL,
+ Uint64 id = erts_proc_sig_send_unlink(&prt->common,
prt->common.id,
&ilnk->link);
if (id != 0)
@@ -3767,14 +3785,16 @@ erts_terminate_port(Port *pp)
}
typedef struct {
- Eterm port_id;
+ Port *port;
Eterm reason;
} ErtsPortExitContext;
static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds)
{
ErtsPortExitContext *pectxt = vpectxt;
- erts_proc_sig_send_link_exit(NULL, pectxt->port_id,
+ Port *port = pectxt->port;
+
+ erts_proc_sig_send_link_exit(&port->common, port->common.id,
lnk, pectxt->reason, NIL);
return 1;
}
@@ -3782,10 +3802,15 @@ static int link_port_exit(ErtsLink *lnk, void *vpectxt, Sint reds)
static int monitor_port_exit(ErtsMonitor *mon, void *vpectxt, Sint reds)
{
ErtsPortExitContext *pectxt = vpectxt;
- if (erts_monitor_is_target(mon))
- erts_proc_sig_send_monitor_down(mon, pectxt->reason);
- else
- erts_proc_sig_send_demonitor(mon);
+ Port *port = pectxt->port;
+
+ if (erts_monitor_is_target(mon)) {
+ erts_proc_sig_send_monitor_down(&port->common, port->common.id, mon,
+ pectxt->reason);
+ } else {
+ erts_proc_sig_send_demonitor(&port->common, port->common.id, 0, mon);
+ }
+
return 1;
}
@@ -3866,8 +3891,8 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
if (prt->common.u.alive.reg != NULL)
(void) erts_unregister_name(NULL, 0, prt, prt->common.u.alive.reg->name);
- pectxt.port_id = prt->common.id;
pectxt.reason = modified_reason;
+ pectxt.port = prt;
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
@@ -6952,7 +6977,8 @@ static int do_driver_monitor_process(Port *prt,
prt->common.id, process, NIL,
THE_NON_VALUE);
- if (!erts_proc_sig_send_monitor(&mdp->u.target, process)) {
+ if (!erts_proc_sig_send_monitor(&prt->common, prt->common.id,
+ &mdp->u.target, process)) {
erts_monitor_release_both(mdp);
return 1;
}
@@ -7001,7 +7027,8 @@ static int do_driver_demonitor_process(Port *prt, const ErlDrvMonitor *monitor)
return 1;
erts_monitor_tree_delete(&ERTS_P_MONITORS(prt), mon);
- erts_proc_sig_send_demonitor(mon);
+ erts_proc_sig_send_demonitor(&prt->common, prt->common.id, 0, mon);
+
return 0;
}