summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c93
1 files changed, 53 insertions, 40 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index cab8c4b9eb..af2b03c65e 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -267,10 +267,23 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
static int monitor_connection_down(ErtsMonitor *mon, void *unused, Sint reds)
{
- if (erts_monitor_is_origin(mon))
- erts_proc_sig_send_demonitor(mon);
- else
- erts_proc_sig_send_monitor_down(mon, am_noconnection);
+ const ErtsMonitorData *data = erts_monitor_to_data(mon);
+ Eterm from;
+
+ if (erts_monitor_is_origin(mon)) {
+ from = data->u.target.other.item;
+ } else {
+ from = data->origin.other.item;
+ }
+
+ ASSERT(!is_internal_pid(from) && is_internal_pid(mon->other.item));
+
+ if (erts_monitor_is_origin(mon)) {
+ erts_proc_sig_send_demonitor(NULL, from, 0, mon);
+ } else {
+ erts_proc_sig_send_monitor_down(NULL, from, mon, am_noconnection);
+ }
+
return ERTS_MON_LNK_FIRE_REDS;
}
@@ -282,8 +295,7 @@ static int dist_pend_spawn_exit_connection_down(ErtsMonitor *mon, void *unused,
static int link_connection_down(ErtsLink *lnk, void *vdist, Sint reds)
{
- erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk,
- am_noconnection, NIL);
+ erts_proc_sig_send_link_exit_noconnection(lnk);
return ERTS_MON_LNK_FIRE_REDS;
}
@@ -828,8 +840,10 @@ inc_no_nodes(void)
static void
kill_dist_ctrl_proc(void *vpid)
{
- erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid,
- am_kill, NIL, 0);
+ /* Send a 'kill' exit signal from init process */
+ Process *init_proc = erts_proc_lookup_raw(erts_init_process_id);
+ erts_proc_sig_send_exit(&init_proc->common, erts_init_process_id,
+ (Eterm)vpid, am_kill, NIL, 0);
}
static void
@@ -1315,7 +1329,7 @@ erts_dsig_send_unlink(ErtsDSigSendContext *ctx, Eterm local, Eterm remote, Uint6
* end up in an inconsistent state as we could before the new link
* protocol was introduced...
*/
- erts_proc_sig_send_dist_unlink_ack(ctx->c_p, ctx->dep, ctx->connection_id,
+ erts_proc_sig_send_dist_unlink_ack(ctx->dep, ctx->connection_id,
remote, local, id);
ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_UNLINK), local, remote);
}
@@ -2077,11 +2091,10 @@ int erts_net_message(Port *prt,
ASSERT(eq(ldp->proc.other.item, from));
code = erts_link_dist_insert(&ldp->dist, ede.mld);
- if (erts_proc_sig_send_link(NULL, to, &ldp->proc)) {
+ if (erts_proc_sig_send_link(NULL, from, to, &ldp->proc)) {
if (!code) {
/* Race: connection already down => send link exit */
- erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &ldp->dist,
- am_noconnection, NIL);
+ erts_proc_sig_send_link_exit_noconnection(&ldp->dist);
}
break; /* Done */
}
@@ -2164,8 +2177,7 @@ int erts_net_message(Port *prt,
if (is_not_internal_pid(to))
goto invalid_message;
- erts_proc_sig_send_dist_unlink_ack(NULL, dep, conn_id,
- from, to, id);
+ erts_proc_sig_send_dist_unlink_ack(dep, conn_id, from, to, id);
break;
}
@@ -2221,8 +2233,10 @@ int erts_net_message(Port *prt,
break;
}
- if (erts_proc_sig_send_monitor(&mdp->u.target, pid))
+ if (erts_proc_sig_send_monitor(NULL, watcher,
+ &mdp->u.target, pid)) {
break; /* done */
+ }
/* Failed to send to local proc; cleanup reply noproc... */
@@ -2243,8 +2257,7 @@ int erts_net_message(Port *prt,
case DOP_DEMONITOR_P:
/* A remote node informs us that a local pid in no longer monitored
- We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref},
- We need only the ref of course */
+ We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref}. */
if (tuple_arity != 4) {
goto invalid_message;
@@ -2261,9 +2274,9 @@ int erts_net_message(Port *prt,
if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep)
goto invalid_message;
- if (is_internal_pid(watched))
- erts_proc_sig_send_dist_demonitor(watched, ref);
- else if (is_external_pid(watched)
+ if (is_internal_pid(watched)) {
+ erts_proc_sig_send_dist_demonitor(watcher, watched, ref);
+ } else if (is_external_pid(watched)
&& external_pid_dist_entry(watched) == erts_this_dist_entry) {
/* old incarnation; ignore it */
;
@@ -2276,13 +2289,14 @@ int erts_net_message(Port *prt,
mon = erts_monitor_tree_lookup(ede.mld->orig_name_monitors, ref);
if (mon)
erts_monitor_tree_delete(&ede.mld->orig_name_monitors, mon);
- }
- else
+ } else {
mon = NULL;
+ }
erts_mtx_unlock(&ede.mld->mtx);
- if (mon)
- erts_proc_sig_send_demonitor(mon);
+ if (mon) {
+ erts_proc_sig_send_demonitor(NULL, watcher, 0, mon);
+ }
}
else
goto invalid_message;
@@ -2367,13 +2381,14 @@ int erts_net_message(Port *prt,
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
- to = tuple[3];
- if (is_not_pid(to))
- goto invalid_message;
- rp = erts_proc_lookup(to);
+ from = tuple[2];
+ to = tuple[3];
+ if (is_not_pid(to))
+ goto invalid_message;
+ rp = erts_proc_lookup(to);
- if (rp) {
- erts_queue_dist_message(rp, 0, edep, ede_hfrag, token, am_Empty);
+ if (rp) {
+ erts_queue_dist_message(rp, 0, edep, ede_hfrag, token, from);
} else if (ede_hfrag != NULL) {
erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
free_message_buffer(ede_hfrag);
@@ -2398,12 +2413,12 @@ int erts_net_message(Port *prt,
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(edep, "ALIAS MSG", buf, orig_len);
#endif
-
- to = tuple[3];
- if (is_not_ref(to))
- goto invalid_message;
-
- erts_proc_sig_send_dist_to_alias(to, edep, ede_hfrag, token);
+ from = tuple[2];
+ to = tuple[3];
+ if (is_not_ref(to)) {
+ goto invalid_message;
+ }
+ erts_proc_sig_send_dist_to_alias(from, to, edep, ede_hfrag, token);
break;
case DOP_PAYLOAD_MONITOR_P_EXIT:
@@ -2841,10 +2856,8 @@ int erts_net_message(Port *prt,
ref,
dep->mld);
}
- }
- else if (lnk && !link_inserted) {
- erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, &ldp->dist,
- am_noconnection, NIL);
+ } else if (lnk && !link_inserted) {
+ erts_proc_sig_send_link_exit_noconnection(&ldp->dist);
}
break;