summaryrefslogtreecommitdiff
path: root/erts/emulator/beam/external.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r--erts/emulator/beam/external.c945
1 files changed, 620 insertions, 325 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index d0a2c61834..081ce23e49 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2022. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2023. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,6 +47,8 @@
#include "erl_proc_sig_queue.h"
#include "erl_trace.h"
#include "erl_global_literals.h"
+#include "erl_term_hashing.h"
+
#define PASS_THROUGH 'p'
@@ -95,8 +97,8 @@ static byte* enc_atom(ErtsAtomCacheMap *, Eterm, byte*, Uint64);
static byte* enc_pid(ErtsAtomCacheMap *, Eterm, byte*, Uint64);
struct B2TContext_t;
static const byte* dec_term(ErtsDistExternal*, ErtsHeapFactory*, const byte*, Eterm*, struct B2TContext_t*, int);
-static const byte* dec_atom(ErtsDistExternal *, const byte*, Eterm*);
-static const byte* dec_pid(ErtsDistExternal *, ErtsHeapFactory*, const byte*, Eterm*, byte tag);
+static const byte* dec_atom(ErtsDistExternal *, const byte*, Eterm*, int);
+static const byte* dec_pid(ErtsDistExternal *, ErtsHeapFactory*, const byte*, Eterm*, byte tag, int);
static Sint decoded_size(const byte *ep, const byte* endp, int internal_tags, struct B2TContext_t*);
static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1);
@@ -104,7 +106,6 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm
Uint64 dflags, Binary *context_b, int iovec,
Uint fragment_size);
-static Uint encode_size_struct2(ErtsAtomCacheMap *, Eterm, Uint64);
static ErtsExtSzRes encode_size_struct_int(TTBSizeContext*, ErtsAtomCacheMap *acmp,
Eterm obj, Uint64 dflags, Sint *reds, Uint *res);
@@ -126,6 +127,72 @@ void erts_init_external(void) {
return;
}
+static Uint32 local_node_hash;
+
+void erts_late_init_external(void)
+{
+ char hname[256], pidstr[21];
+ size_t hname_len, pidstr_len;
+ ErtsMonotonicTime mtime, toffs;
+ ErtsBlockHashState hstate;
+ byte *lnid;
+ Uint lnid_ix, chunk_size;
+ int res;
+
+ res = sys_get_hostname(&hname[0], sizeof(hname));
+ if (res == 0) {
+ hname_len = strlen(hname);
+ }
+ else {
+ hname[0] = '\0';
+ hname_len = 0;
+ }
+
+ sys_get_pid(&pidstr[0], sizeof(pidstr));
+ pidstr[20] = '\0';
+
+ pidstr_len = strlen(pidstr);
+
+ toffs = erts_get_time_offset();
+ mtime = erts_get_monotonic_time(NULL);
+
+ lnid = (byte *) erts_alloc(ERTS_ALC_T_TMP, 8 + hname_len + pidstr_len);
+
+ lnid_ix = 0;
+
+ /* time offset... */
+ lnid[lnid_ix++] = (byte) toffs & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 8) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 16) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 24) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 32) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 40) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 48) & 0xff;
+ lnid[lnid_ix++] = (byte) (toffs >> 56) & 0xff;
+
+ /* hostname... */
+
+ sys_memcpy(&lnid[lnid_ix], &hname[0], hname_len);
+
+ lnid_ix += hname_len;
+
+ /* pid... */
+ memcpy(&lnid[lnid_ix], &pidstr[0], pidstr_len);
+
+ /*
+ * Use least significant 32 bits of monotonic time as initial
+ * value to hash...
+ */
+ erts_block_hash_init(&hstate, &lnid[0], lnid_ix,
+ (Uint32) (mtime & 0xffffffff));
+ chunk_size = ERTS_UINT_MAX;
+ res = erts_block_hash(&local_node_hash, &chunk_size, &hstate);
+ ASSERT(res); (void) res;
+ ASSERT(chunk_size == lnid_ix);
+
+ erts_free(ERTS_ALC_T_TMP, lnid);
+}
+
#define ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES 255
#define ERTS_DIST_HDR_ATOM_CACHE_FLAG_BYTE_IX(IIX) \
@@ -659,7 +726,7 @@ erts_encode_dist_ext_size(Eterm term,
return res;
}
-ErtsExtSzRes erts_encode_ext_size_2(Eterm term, unsigned dflags, Uint *szp)
+ErtsExtSzRes erts_encode_ext_size_2(Eterm term, Uint64 dflags, Uint *szp)
{
ErtsExtSzRes res;
*szp = 0;
@@ -675,8 +742,14 @@ ErtsExtSzRes erts_encode_ext_size(Eterm term, Uint *szp)
Uint erts_encode_ext_size_ets(Eterm term)
{
- return encode_size_struct2(NULL, term,
- TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED);
+ ErtsExtSzRes res;
+ Uint sz = 0;
+
+ res = encode_size_struct_int(NULL, NULL, term,
+ TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED,
+ NULL, &sz);
+ ASSERT(res == ERTS_EXT_SZ_OK); (void) res;
+ return sz;
}
@@ -1339,7 +1412,7 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1)
0, 0,bin, 0, ~((Uint) 0));
if (is_non_value(res)) {
if (erts_set_gc_state(BIF_P, 1)
- || MSO(BIF_P).overhead > BIN_VHEAP_SZ(BIF_P)) {
+ || MSO(BIF_P).overhead > BIF_P->bin_vheap_sz) {
ERTS_VBUMP_ALL_REDS(BIF_P);
}
if (Opts == am_undefined)
@@ -1354,7 +1427,7 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1)
BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res);
} else {
if (erts_set_gc_state(BIF_P, 1)
- || MSO(BIF_P).overhead > BIN_VHEAP_SZ(BIF_P))
+ || MSO(BIF_P).overhead > BIF_P->bin_vheap_sz)
ERTS_BIF_YIELD_RETURN(BIF_P, res);
else
BIF_RET(res);
@@ -1400,12 +1473,12 @@ BIF_RETTYPE term_to_iovec_1(BIF_ALIST_1)
}
static ERTS_INLINE int
-parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep)
+parse_t2b_opts(Eterm opts, Uint64 *flagsp, int *levelp, int *iovecp, Uint *fsizep)
{
int level = 0;
int iovec = 0;
- Uint flags = TERM_TO_BINARY_DFLAGS;
- int deterministic = 0;
+ Uint64 flags = TERM_TO_BINARY_DFLAGS;
+ int deterministic = 0, local = 0;
Uint fsize = ~((Uint) 0); /* one fragment */
while (is_list(opts)) {
@@ -1418,17 +1491,20 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep)
iovec = !0;
} else if (arg == am_deterministic) {
deterministic = 1;
+ } else if (arg == am_local) {
+ local = !0;
} else if (is_tuple(arg) && *(tp = tuple_val(arg)) == make_arityval(2)) {
if (tp[1] == am_minor_version && is_small(tp[2])) {
switch (signed_val(tp[2])) {
case 0:
- flags = TERM_TO_BINARY_DFLAGS & ~DFLAG_NEW_FLOATS;
+ flags = (TERM_TO_BINARY_DFLAGS
+ & ~(DFLAG_NEW_FLOATS | DFLAG_UTF8_ATOMS));
break;
case 1: /* Current default... */
- flags = TERM_TO_BINARY_DFLAGS;
+ flags = TERM_TO_BINARY_DFLAGS & ~DFLAG_UTF8_ATOMS;
break;
case 2:
- flags = TERM_TO_BINARY_DFLAGS | DFLAG_UTF8_ATOMS;
+ flags = TERM_TO_BINARY_DFLAGS;
break;
default:
return 0; /* badarg */
@@ -1459,6 +1535,14 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep)
return 0; /* badarg */
}
+ if (deterministic && local) {
+ return 0; /* badarg */
+ }
+
+ if (local) {
+ flags |= DFLAG_LOCAL_EXT;
+ }
+
if (deterministic) {
flags |= DFLAG_DETERMINISTIC;
}
@@ -1476,7 +1560,7 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep)
BIF_RETTYPE term_to_binary_2(BIF_ALIST_2)
{
int level;
- Uint flags;
+ Uint64 flags;
Eterm res;
if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) {
@@ -1503,7 +1587,7 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2)
BIF_RETTYPE term_to_iovec_2(BIF_ALIST_2)
{
int level;
- Uint flags;
+ Uint64 flags;
Eterm res;
if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) {
@@ -1532,7 +1616,7 @@ erts_debug_term_to_binary(Process *p, Eterm term, Eterm opts)
{
Eterm ret;
int level, iovec;
- Uint flags;
+ Uint64 flags;
Uint fsize;
if (!parse_t2b_opts(opts, &flags, &level, &iovec, &fsize)) {
@@ -1579,9 +1663,12 @@ enum B2TState { /* order is somewhat significant */
typedef struct {
Sint heap_size;
- int terms;
- const byte* ep;
+ Uint32 terms;
int atom_extra_skip;
+ const byte* ep;
+ ErtsBlockHashState lext_state;
+ const byte *lext_hash;
+ Uint32 lext_term_end;
} B2TSizeContext;
typedef struct {
@@ -1590,6 +1677,7 @@ typedef struct {
Eterm* next;
ErtsHeapFactory factory;
int remaining_n;
+ int internal_nc;
char* remaining_bytes;
ErtsPStack map_array;
} B2TDecodeContext;
@@ -1819,7 +1907,6 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1)
return binary_to_term_int(BIF_P, THE_NON_VALUE, ERTS_MAGIC_BIN_DATA(context_bin));
}
-
#define B2T_BYTES_PER_REDUCTION 128
#define B2T_MEMCPY_FACTOR 8
@@ -1946,6 +2033,7 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Eterm bin, B2TContext *ctx)
ctx->u.dc.ep = ctx->b2ts.extp;
ctx->u.dc.res = (Eterm) (UWord) NULL;
ctx->u.dc.next = &ctx->u.dc.res;
+ ctx->u.dc.internal_nc = 0;
erts_factory_proc_prealloc_init(&ctx->u.dc.factory, p, ctx->heap_size);
ctx->u.dc.map_array.pstart = NULL;
ctx->state = B2TDecode;
@@ -2117,34 +2205,11 @@ Eterm
external_size_2(BIF_ALIST_2)
{
Uint size = 0;
- Uint flags = TERM_TO_BINARY_DFLAGS;
-
- while (is_list(BIF_ARG_2)) {
- Eterm arg = CAR(list_val(BIF_ARG_2));
- Eterm* tp;
+ int level;
+ Uint64 flags;
- if (is_tuple(arg) && *(tp = tuple_val(arg)) == make_arityval(2)) {
- if (tp[1] == am_minor_version && is_small(tp[2])) {
- switch (signed_val(tp[2])) {
- case 0:
- flags &= ~DFLAG_NEW_FLOATS;
- break;
- case 1:
- break;
- default:
- goto error;
- }
- } else {
- goto error;
- }
- } else {
- error:
- BIF_ERROR(BIF_P, BADARG);
- }
- BIF_ARG_2 = CDR(list_val(BIF_ARG_2));
- }
- if (is_not_nil(BIF_ARG_2)) {
- goto error;
+ if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) {
+ BIF_ERROR(BIF_P, BADARG);
}
switch (erts_encode_ext_size_2(BIF_ARG_1, flags, &size)) {
@@ -2816,45 +2881,80 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint64 dflags)
}
/*
- * We use this atom as sysname in local pid/port/refs
- * for the ETS compressed format
+ * We use INTERNAL_LOCAL_SYSNAME to mark internal node for pid/port/refs
+ * in the ETS compressed format and local format.
*
*/
-#define INTERNAL_LOCAL_SYSNAME am_ErtsSecretAtom
+#define INTERNAL_LOCAL_SYSNAME NIL
-static byte*
-enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags)
+static ERTS_INLINE byte *
+enc_internal_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags)
{
- Uint on, os;
- Eterm sysname = ((is_internal_pid(pid) && (dflags & DFLAG_ETS_COMPRESSED))
- ? INTERNAL_LOCAL_SYSNAME : pid_node_name(pid));
- Uint32 creation = pid_creation(pid);
+ Uint32 number, serial, creation;
- *ep++ = NEW_PID_EXT;
+ ASSERT(is_internal_pid(pid));
- ep = enc_atom(acmp, sysname, ep, dflags);
+ *ep++ = NEW_PID_EXT;
- if (is_internal_pid(pid)) {
- on = internal_pid_number(pid);
- os = internal_pid_serial(pid);
+ number = internal_pid_number(pid);
+ serial = internal_pid_serial(pid);
+ if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) {
+ *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */
+ creation = 0;
}
else {
- on = external_pid_number(pid);
- os = external_pid_serial(pid);
+ Eterm sysname = internal_pid_node_name(pid);
+ creation = internal_pid_creation(pid);
+ ep = enc_atom(acmp, sysname, ep, dflags);
}
- put_int32(on, ep);
+ put_int32(number, ep);
+ ep += 4;
+ put_int32(serial, ep);
+ ep += 4;
+ put_int32(creation, ep);
+ ep += 4;
+ return ep;
+
+}
+
+static ERTS_INLINE byte *
+enc_external_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags)
+{
+ Uint32 number, serial, creation;
+ Eterm sysname;
+
+ ASSERT(is_external_pid(pid));
+
+ *ep++ = NEW_PID_EXT;
+
+ ASSERT(is_external_pid(pid));
+ number = external_pid_number(pid);
+ serial = external_pid_serial(pid);
+ sysname = external_pid_node_name(pid);
+ creation = external_pid_creation(pid);
+ ep = enc_atom(acmp, sysname, ep, dflags);
+
+ put_int32(number, ep);
ep += 4;
- put_int32(os, ep);
+ put_int32(serial, ep);
ep += 4;
put_int32(creation, ep);
ep += 4;
return ep;
}
+static byte*
+enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags)
+{
+ if (is_internal_pid(pid))
+ return enc_internal_pid(acmp, pid, ep, dflags);
+ return enc_external_pid(acmp, pid, ep, dflags);
+}
+
/* Expect an atom in plain text or cached */
static const byte*
-dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp)
+dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp, int internal_nc)
{
Uint len;
int n;
@@ -2919,7 +3019,12 @@ dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp)
}
*objp = make_atom(n);
break;
-
+ case NIL_EXT:
+ if (internal_nc) {
+ *objp = INTERNAL_LOCAL_SYSNAME;
+ break;
+ }
+ /* else: fail... */
default:
error:
*objp = NIL; /* Don't leave a hole in the heap */
@@ -2947,7 +3052,7 @@ static ERTS_INLINE ErlNode* dec_get_node(Eterm sysname, Uint32 creation, Eterm b
static const byte*
dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, const byte* ep,
- Eterm* objp, byte tag)
+ Eterm* objp, byte tag, int internal_nc)
{
Eterm sysname;
Uint data;
@@ -2958,7 +3063,7 @@ dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, const byte* ep,
*objp = NIL; /* In case we fail, don't leave a hole in the heap */
/* eat first atom */
- if ((ep = dec_atom(edep, ep, &sysname)) == NULL)
+ if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL)
return NULL;
num = get_uint32(ep);
ep += 4;
@@ -3071,6 +3176,7 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
FloatDef f;
register Sint r = 0;
int use_iov = 0;
+ byte *lext_hash = NULL; /* initialize to avoid faulty warning... */
/* The following variables are only used during encoding of
* a map when the `deterministic` option is active. */
@@ -3088,10 +3194,39 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
obj = ctx->obj;
map_array = ctx->map_array;
next_map_element = ctx->next_map_element;
+ lext_hash = ctx->lext_hash;
if (is_non_value(obj)) {
goto outer_loop;
}
+ else {
+ goto L_jump_start;
+ }
}
+ if (ctx->continue_make_lext_hash) {
+ lext_hash = ctx->lext_hash;
+ ep = ctx->ep;
+ if (use_iov) {
+ goto continue_make_lext_hash_iov;
+ }
+ else {
+ goto continue_make_lext_hash_bin;
+ }
+ }
+ }
+
+ /* We only pass here once at the start of the encoding... */
+ if (dflags & DFLAG_LOCAL_EXT) {
+ *ep++ = LOCAL_EXT;
+ lext_hash = ep;
+ if (ctx)
+ ctx->lext_hash = ep;
+ ep += 4; /* 32-bit hash */
+ if (use_iov) {
+ ASSERT(ctx);
+ store_in_vec(ctx, ep, NULL, THE_NON_VALUE, NULL, 0);
+ /* current vlen is now where to start calculating the hash */
+ ctx->lext_vlen = ctx->vlen;
+ }
}
goto L_jump_start;
@@ -3210,7 +3345,6 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
long num_reductions = r;
n = next_map_element - map_array;
- ASSERT(n > MAP_SMALL_MAP_LIMIT);
if (ctx == NULL) {
/* No context means that the external representation of term
* being encoded will fit in a heap binary (64 bytes). This
@@ -3375,54 +3509,89 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
}
break;
- case PID_DEF:
case EXTERNAL_PID_DEF:
- ep = enc_pid(acmp, obj, ep, dflags);
+ ep = enc_external_pid(acmp, obj, ep, dflags);
+ break;
+
+ case PID_DEF:
+ ep = enc_internal_pid(acmp, obj, ep, dflags);
break;
- case REF_DEF:
case EXTERNAL_REF_DEF: {
+ Eterm sysname;
Uint32 *ref_num;
- Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_ref(obj))
- ? INTERNAL_LOCAL_SYSNAME : ref_node_name(obj));
- Uint32 creation = ref_creation(obj);
+ Uint32 creation;
- erts_magic_ref_save_bin(obj);
+ *ep++ = NEWER_REFERENCE_EXT;
+
+ ref_num = external_ref_numbers(obj);
+ i = external_ref_no_numbers(obj);
+ put_int16(i, ep);
+ ep += 2;
+
+ sysname = external_ref_node_name(obj);
+ creation = external_ref_creation(obj);
+ goto ref_common;
+
+ case REF_DEF:
*ep++ = NEWER_REFERENCE_EXT;
- i = ref_no_numbers(obj);
- put_int16(i, ep);
- ep += 2;
- ep = enc_atom(acmp, sysname, ep, dflags);
+
+ erts_magic_ref_save_bin(obj);
+
+ ref_num = internal_ref_numbers(obj);
+ i = internal_ref_no_numbers(obj);
+ put_int16(i, ep);
+ ep += 2;
+
+ if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) {
+ *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */
+ creation = 0;
+ }
+ else {
+ sysname = internal_ref_node_name(obj);
+ creation = internal_ref_creation(obj);
+ ref_common:
+ ep = enc_atom(acmp, sysname, ep, dflags);
+ }
+
put_int32(creation, ep);
ep += 4;
- ref_num = ref_numbers(obj);
for (j = 0; j < i; j++) {
put_int32(ref_num[j], ep);
ep += 4;
}
break;
}
- case PORT_DEF:
case EXTERNAL_PORT_DEF: {
- Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_port(obj))
- ? INTERNAL_LOCAL_SYSNAME : port_node_name(obj));
- Uint32 creation = port_creation(obj);
- byte *tagp = ep++;
- Uint64 num;
-
- ep = enc_atom(acmp, sysname, ep, dflags);
- num = port_number(obj);
- if (num > ERTS_MAX_V3_PORT_NUMBER) {
- *tagp = V4_PORT_EXT;
- put_int64(num, ep);
- ep += 8;
- }
- else {
- *tagp = NEW_PORT_EXT;
- put_int32(num, ep);
- ep += 4;
- }
+ Eterm sysname;
+ Uint64 number;
+ Uint32 creation;
+
+ *ep++ = V4_PORT_EXT;
+ number = external_port_number(obj);
+ sysname = external_port_node_name(obj);
+ creation = external_port_creation(obj);
+
+ goto port_common;
+
+ case PORT_DEF:
+
+ *ep++ = V4_PORT_EXT;
+ number = internal_port_number(obj);
+ if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) {
+ *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */
+ creation = 0;
+ }
+ else {
+ sysname = internal_port_node_name(obj);
+ creation = internal_port_creation(obj);
+ port_common:
+ ep = enc_atom(acmp, sysname, ep, dflags);
+ }
+
+ put_int64(number, ep);
+ ep += 8;
put_int32(creation, ep);
ep += 4;
break;
@@ -3481,9 +3650,20 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
Eterm *kptr = flatmap_get_keys(mp);
Eterm *vptr = flatmap_get_values(mp);
- WSTACK_PUSH4(s, (UWord)kptr, (UWord)vptr,
- ENC_MAP_PAIR, size);
- }
+ if (dflags & DFLAG_DETERMINISTIC) {
+ ASSERT(map_array == NULL);
+ next_map_element = map_array = alloc_map_array(size);
+ while (size--) {
+ *next_map_element++ = *kptr++;
+ *next_map_element++ = *vptr++;
+ }
+ WSTACK_PUSH2(s, ENC_START_SORTING_MAP, THE_NON_VALUE);
+ }
+ else {
+ WSTACK_PUSH4(s, (UWord)kptr, (UWord)vptr,
+ ENC_MAP_PAIR, size);
+ }
+ }
} else {
Eterm hdr;
Uint node_sz;
@@ -3741,6 +3921,68 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep,
if (use_iov)
store_in_vec(ctx, ep, NULL, THE_NON_VALUE, NULL, 0);
}
+ if (dflags & DFLAG_LOCAL_EXT) {
+ int done;
+ Uint32 hash;
+ Uint chunk_len;
+
+ if (use_iov) {
+ ASSERT(ctx);
+ erts_iov_block_hash_init(&ctx->lext_state.iov_block,
+ &ctx->iov[ctx->lext_vlen],
+ ctx->vlen - ctx->lext_vlen,
+ local_node_hash);
+
+ continue_make_lext_hash_iov:
+ /* Do 128 bytes per reduction... */
+ chunk_len = (Uint) r*128;
+
+ done = erts_iov_block_hash(&hash, &chunk_len,
+ &ctx->lext_state.iov_block);
+ }
+ else {
+ ErtsBlockHashState lext_state_buf, *lext_state;
+ byte *ep_start = lext_hash + 4 /* 32 bit hash value */;
+ Sint len = ep - ep_start;
+
+ ASSERT(len >= 0);
+
+ lext_state = ctx ? &ctx->lext_state.block : &lext_state_buf;
+
+ erts_block_hash_init(lext_state, ep_start, len, local_node_hash);
+
+ if (!ctx) {
+ /* Do it all at once... */
+ chunk_len = ERTS_UINT_MAX;
+ }
+ else {
+ continue_make_lext_hash_bin:
+ lext_state = &ctx->lext_state.block;
+ /* Do 128 bytes per reduction... */
+ chunk_len = (Uint) r*128;
+ }
+
+ done = erts_block_hash(&hash, &chunk_len, lext_state);
+ }
+
+ if (!ctx) {
+ ASSERT(done);
+ }
+ else {
+ if (!done) {
+ /* yield; more work calculating hash... */
+ ctx->ep = ep;
+ ctx->continue_make_lext_hash = !0;
+ *reds = 0;
+ return -1;
+ }
+ r -= chunk_len/128;
+ *reds = r;
+ }
+
+ put_int32(hash, lext_hash);
+ }
+
*res = ep;
return 0;
}
@@ -3924,7 +4166,7 @@ dec_term(ErtsDistExternal *edep,
{
#define PSTACK_TYPE struct dec_term_map
PSTACK_DECLARE(map_array, 10);
- int n;
+ int n, internal_nc = ets_decode;
ErtsAtomEncoding char_enc;
register Eterm* hp; /* Please don't take the address of hp */
Eterm* next;
@@ -3938,6 +4180,7 @@ dec_term(ErtsDistExternal *edep,
next = ctx->u.dc.next;
ep = ctx->u.dc.ep;
factory = &ctx->u.dc.factory;
+ internal_nc = ctx->u.dc.internal_nc;
if (ctx->state != B2TDecode) {
int n_limit = reds;
@@ -4029,6 +4272,8 @@ dec_term(ErtsDistExternal *edep,
objp = next;
next = (Eterm *) *objp;
+ continue_this_obj:
+
switch (*ep++) {
case INTEGER_EXT:
{
@@ -4272,7 +4517,7 @@ dec_term_atom_common:
case PID_EXT:
case NEW_PID_EXT:
factory->hp = hp;
- ep = dec_pid(edep, factory, ep, objp, ep[-1]);
+ ep = dec_pid(edep, factory, ep, objp, ep[-1], internal_nc);
hp = factory->hp;
if (ep == NULL) {
goto error;
@@ -4288,7 +4533,7 @@ dec_term_atom_common:
Uint32 cre;
byte tag = ep[-1];
- if ((ep = dec_atom(edep, ep, &sysname)) == NULL) {
+ if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) {
goto error;
}
if (tag == V4_PORT_EXT) {
@@ -4348,7 +4593,7 @@ dec_term_atom_common:
ref_words = 1;
- if ((ep = dec_atom(edep, ep, &sysname)) == NULL)
+ if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL)
goto error;
if ((r0 = get_int32(ep)) >= MAX_REFERENCE )
goto error;
@@ -4365,7 +4610,7 @@ dec_term_atom_common:
ref_words = get_int16(ep);
ep += 2;
- if ((ep = dec_atom(edep, ep, &sysname)) == NULL)
+ if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL)
goto error;
cre = get_int8(ep);
@@ -4383,7 +4628,7 @@ dec_term_atom_common:
ref_words = get_int16(ep);
ep += 2;
- if ((ep = dec_atom(edep, ep, &sysname)) == NULL)
+ if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL)
goto error;
cre = get_int32(ep);
@@ -4416,7 +4661,7 @@ dec_term_atom_common:
}
if (ref_words != ERTS_REF_NUMBERS) {
int i;
- if (ref_words > ERTS_REF_NUMBERS)
+ if (ref_words > ERTS_MAX_REF_NUMBERS)
goto error; /* Not a ref that we created... */
for (i = ref_words; i < ERTS_REF_NUMBERS; i++)
ref_num[i] = 0;
@@ -4428,12 +4673,15 @@ dec_term_atom_common:
}
else {
/* Check if it is a pid reference... */
- Eterm pid = erts_pid_ref_lookup(ref_num);
+ Eterm pid = erts_pid_ref_lookup(ref_num, ref_words);
if (is_internal_pid(pid)) {
write_pid_ref_thing(hp, ref_num[0], ref_num[1],
ref_num[2], pid);
hp += ERTS_PID_REF_THING_SIZE;
}
+ else if (is_non_value(pid)) {
+ goto error; /* invalid reference... */
+ }
else {
/* Check if it is a magic reference... */
ErtsMagicBinary *mb = erts_magic_ref_lookup_bin(ref_num);
@@ -4618,10 +4866,10 @@ dec_term_atom_common:
Eterm temp;
Sint arity;
- if ((ep = dec_atom(edep, ep, &mod)) == NULL) {
+ if ((ep = dec_atom(edep, ep, &mod, 0)) == NULL) {
goto error;
}
- if ((ep = dec_atom(edep, ep, &name)) == NULL) {
+ if ((ep = dec_atom(edep, ep, &name, 0)) == NULL) {
goto error;
}
factory->hp = hp;
@@ -4736,7 +4984,7 @@ dec_term_atom_common:
*objp = make_fun(funp);
/* Module */
- if ((ep = dec_atom(edep, ep, &module)) == NULL) {
+ if ((ep = dec_atom(edep, ep, &module, 0)) == NULL) {
goto error;
}
factory->hp = hp;
@@ -4849,6 +5097,13 @@ dec_term_atom_common:
break;
}
+ case LOCAL_EXT:
+ internal_nc = !0;
+ if (ctx)
+ ctx->u.dc.internal_nc = !0;
+ ep += 4; /* 32-bit hash (verified in decoded_size()) */
+ goto continue_this_obj;
+
default:
goto error;
}
@@ -4949,23 +5204,88 @@ error_map_fixup:
return NULL;
}
-/* returns the number of bytes needed to encode an object
- to a sequence of bytes
- N.B. That this must agree with to_external2() above!!!
- (except for cached atoms) */
-static Uint encode_size_struct2(ErtsAtomCacheMap *acmp,
- Eterm obj,
- Uint64 dflags) {
- Uint size = 0;
- ErtsExtSzRes res = encode_size_struct_int(NULL, acmp, obj,
- dflags, NULL,
- &size);
- /*
- * encode_size_struct2() only allowed when
- * we know the result will always be OK!
- */
- ASSERT(res == ERTS_EXT_SZ_OK); (void) res;
- return (Uint) size;
+static Uint
+encode_atom_size(ErtsAtomCacheMap *acmp, Eterm atom, Uint64 dflags)
+{
+ ASSERT(is_atom(atom));
+ if (dflags & DFLAG_ETS_COMPRESSED) {
+ if (atom_val(atom) >= (1<<16)) {
+ return (Uint) 1 + 3;
+ }
+ else {
+ return (Uint) 1 + 2;
+ }
+ }
+ else {
+ Atom *a = atom_tab(atom_val(atom));
+ int alen;
+ Uint result;
+ if ((dflags & DFLAG_UTF8_ATOMS) || a->latin1_chars < 0) {
+ alen = a->len;
+ result = (Uint) 1 + 1 + alen;
+ if (alen > 255) {
+ result++; /* ATOM_UTF8_EXT (not small) */
+ }
+ }
+ else {
+ alen = a->latin1_chars;
+ result = (Uint) 1 + 1 + alen;
+ if (alen > 255 || !(dflags & DFLAG_SMALL_ATOM_TAGS))
+ result++; /* ATOM_EXT (not small) */
+ }
+ insert_acache_map(acmp, atom, dflags);
+ return result;
+ }
+}
+
+static Uint
+encode_internal_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags)
+{
+ int nlen;
+ ASSERT(is_internal_pid(pid));
+ nlen = ((dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT))
+ ? 1
+ : encode_atom_size(acmp, internal_pid_node_name(pid), dflags));
+ return (Uint) 1 + nlen + 4 + 4 + 4;
+}
+
+static Uint
+encode_external_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags)
+{
+ int nlen;
+ ASSERT(is_external_pid(pid));
+ nlen = encode_atom_size(acmp, external_pid_node_name(pid), dflags);
+ return (Uint) 1 + nlen + 4 + 4 + 4;
+}
+
+static Uint
+encode_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags)
+{
+ if (is_internal_pid(pid))
+ return encode_internal_pid_size(acmp, pid, dflags);
+ ASSERT(is_external_pid(pid));
+ return encode_external_pid_size(acmp, pid, dflags);
+}
+
+static Uint
+encode_small_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags)
+{
+ Sint val = signed_val(pid);
+ Uint result;
+
+ if ((Uint)val < 256)
+ result = (Uint) 1 + 1; /* SMALL_INTEGER_EXT */
+ else if (sizeof(Sint) == 4 || IS_SSMALL32(val))
+ result = (Uint) 1 + 4; /* INTEGER_EXT */
+ else {
+ int i;
+ DeclareTmpHeapNoproc(tmp_big,2);
+ UseTmpHeapNoproc(2);
+ i = big_bytes(small_to_big(val, tmp_big));
+ result = (Uint) 1 + 1 + 1 + i; /* SMALL_BIG_EXT */
+ UnUseTmpHeapNoproc(2);
+ }
+ return result;
}
static ErtsExtSzRes
@@ -4978,14 +5298,24 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
Sint r = 0;
int vlen = -1;
- if (ctx) {
+ if (!ctx) {
+ if (dflags & DFLAG_LOCAL_EXT)
+ result += 5;
+ }
+ else {
WSTACK_CHANGE_ALLOCATOR(s, ERTS_ALC_T_SAVED_ESTACK);
r = *reds;
vlen = ctx->vlen;
- if (!ctx->wstack.wstart)
+ if (!ctx->wstack.wstart) {
ctx->last_result = result;
+ if (dflags & DFLAG_LOCAL_EXT) {
+ result += 5;
+ if (vlen >= 0)
+ vlen++;
+ }
+ }
else { /* restore saved stack */
WSTACK_RESTORE(s, &ctx->wstack);
result = ctx->result;
@@ -5014,49 +5344,10 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
result++;
break;
case ATOM_DEF:
- if (dflags & DFLAG_ETS_COMPRESSED) {
- if (atom_val(obj) >= (1<<16)) {
- result += 1 + 3;
- }
- else {
- result += 1 + 2;
- }
- }
- else {
- Atom *a = atom_tab(atom_val(obj));
- int alen;
- if ((dflags & DFLAG_UTF8_ATOMS) || a->latin1_chars < 0) {
- alen = a->len;
- result += 1 + 1 + alen;
- if (alen > 255) {
- result++; /* ATOM_UTF8_EXT (not small) */
- }
- }
- else {
- alen = a->latin1_chars;
- result += 1 + 1 + alen;
- if (alen > 255 || !(dflags & DFLAG_SMALL_ATOM_TAGS))
- result++; /* ATOM_EXT (not small) */
- }
- insert_acache_map(acmp, obj, dflags);
- }
+ result += encode_atom_size(acmp, obj, dflags);
break;
case SMALL_DEF:
- {
- Sint val = signed_val(obj);
-
- if ((Uint)val < 256)
- result += 1 + 1; /* SMALL_INTEGER_EXT */
- else if (sizeof(Sint) == 4 || IS_SSMALL32(val))
- result += 1 + 4; /* INTEGER_EXT */
- else {
- DeclareTmpHeapNoproc(tmp_big,2);
- UseTmpHeapNoproc(2);
- i = big_bytes(small_to_big(val, tmp_big));
- result += 1 + 1 + 1 + i; /* SMALL_BIG_EXT */
- UnUseTmpHeapNoproc(2);
- }
- }
+ result += encode_small_size(acmp, obj, dflags);
break;
case BIG_DEF:
i = big_bytes(obj);
@@ -5068,22 +5359,47 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
result += 1 + 4 + 1 + i; /* tag,size,sign,digits */
break;
case EXTERNAL_PID_DEF:
+ result += encode_external_pid_size(acmp, obj, dflags);
+ break;
case PID_DEF:
- result += (1 + encode_size_struct2(acmp, pid_node_name(obj), dflags) +
- 4 + 4 + 4);
+ result += encode_internal_pid_size(acmp, obj, dflags);
+ break;
+ case EXTERNAL_REF_DEF: {
+ int nlen = encode_atom_size(acmp,
+ external_ref_node_name(obj),
+ dflags);
+ i = external_ref_no_numbers(obj);
+ result += (1 + 2 + nlen + 4 + 4*i);
break;
- case EXTERNAL_REF_DEF:
- case REF_DEF:
- i = ref_no_numbers(obj);
- result += (1 + 2 + encode_size_struct2(acmp, ref_node_name(obj), dflags) +
- 4 + 4*i);
+ }
+ case REF_DEF: {
+ int nlen;
+ i = internal_ref_no_numbers(obj);
+ if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) {
+ nlen = 1;
+ }
+ else {
+ nlen = encode_atom_size(acmp,
+ internal_ref_node_name(obj),
+ dflags);
+ }
+ result += (1 + 2 + nlen + 4 + 4*i);
+ break;
+ }
+ case EXTERNAL_PORT_DEF: {
+ int nlen = encode_atom_size(acmp,
+ external_port_node_name(obj),
+ dflags);
+ result += (1 + nlen + 8 + 4);
break;
- case EXTERNAL_PORT_DEF:
+ }
case PORT_DEF: {
- Uint64 num = port_number(obj);
- result += (num > ERTS_MAX_V3_PORT_NUMBER) ? 8 : 4;
- result += (1 + encode_size_struct2(acmp, port_node_name(obj), dflags)
- /* num */ + 4);
+ int nlen = ((dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT))
+ ? 1
+ : encode_atom_size(acmp,
+ internal_port_node_name(obj),
+ dflags));
+ result += (1 + nlen + 8 + 4);
break;
}
case LIST_DEF: {
@@ -5270,8 +5586,8 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
if (is_local_fun(funp)) {
result += 20+1+1+4; /* New ID + Tag */
result += 4; /* Length field (number of free variables */
- result += encode_size_struct2(acmp, funp->creator, dflags);
- result += encode_size_struct2(acmp, funp->entry.fun->module, dflags);
+ result += encode_pid_size(acmp, funp->creator, dflags);
+ result += encode_atom_size(acmp, funp->entry.fun->module, dflags);
result += 2 * (1+4); /* Index, Uniq */
if (funp->num_free > 1) {
WSTACK_PUSH2(s, (UWord) (funp->env + 1),
@@ -5287,9 +5603,9 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
ASSERT(is_external_fun(funp) && funp->next == NULL);
result += 1;
- result += encode_size_struct2(acmp, ep->info.mfa.module, dflags);
- result += encode_size_struct2(acmp, ep->info.mfa.function, dflags);
- result += encode_size_struct2(acmp, make_small(ep->info.mfa.arity), dflags);
+ result += encode_atom_size(acmp, ep->info.mfa.module, dflags);
+ result += encode_atom_size(acmp, ep->info.mfa.function, dflags);
+ result += encode_small_size(acmp, make_small(ep->info.mfa.arity), dflags);
}
break;
}
@@ -5345,8 +5661,6 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj,
return ERTS_EXT_SZ_OK;
}
-
-
static Sint
decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ctx)
{
@@ -5354,6 +5668,8 @@ decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ct
int atom_extra_skip;
Uint n;
SWord reds;
+ const byte *lext_hash;
+ Uint32 lext_term_end;
/* Keep track of the current number of sub terms remaining to be decoded.
*
@@ -5366,24 +5682,6 @@ decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ct
*/
Uint32 terms;
- if (ctx) {
- reds = ctx->reds;
- if (ctx->u.sc.ep) {
- heap_size = ctx->u.sc.heap_size;
- terms = ctx->u.sc.terms;
- ep = ctx->u.sc.ep;
- atom_extra_skip = ctx->u.sc.atom_extra_skip;
- goto init_done;
- }
- }
- else
- ERTS_UNDEF(reds, 0);
-
- heap_size = 0;
- terms = 1;
- atom_extra_skip = 0;
-init_done:
-
#define SKIP(sz) \
do { \
if ((sz) <= endp-ep) { \
@@ -5413,6 +5711,38 @@ init_done:
if (terms < before) goto error; \
} while (0)
+
+ if (ctx) {
+ reds = ctx->reds;
+ if (ctx->u.sc.ep) {
+ ep = ctx->u.sc.ep;
+ atom_extra_skip = ctx->u.sc.atom_extra_skip;
+ heap_size = ctx->u.sc.heap_size;
+ lext_hash = ctx->u.sc.lext_hash;
+ lext_term_end = ctx->u.sc.lext_term_end;
+ terms = ctx->u.sc.terms;
+ if (terms == lext_term_end) {
+ ASSERT(lext_hash);
+ goto continue_check_lext;
+ }
+ goto init_done;
+ }
+ }
+ else
+ ERTS_UNDEF(reds, 0);
+
+ heap_size = 0;
+ terms = 1;
+ atom_extra_skip = 0;
+ /*
+ * lext_hash != NULL and local_term_end != ~0 when decoding local external
+ * term format...
+ */
+ lext_hash = NULL;
+ lext_term_end = ~0;
+
+init_done:
+
ASSERT(terms > 0);
do {
int tag;
@@ -5541,6 +5871,18 @@ init_done:
terms++;
break;
case NIL_EXT:
+ if (atom_extra_skip) {
+ /*
+ * atom_extra_skip != 0 should only appear due to local encoding,
+ * or compressed ets encoding, of a node name in internal
+ * pids/ports/refs. If not currently inside a local encoding,
+ * this is an error...
+ */
+ if (!lext_hash && !internal_tags)
+ goto error;
+ SKIP(atom_extra_skip);
+ atom_extra_skip = 0;
+ }
break;
case LIST_EXT:
CHKSIZE(4);
@@ -5687,22 +6029,87 @@ init_done:
SKIP(2+sizeof(ProcBin));
heap_size += PROC_BIN_SIZE + ERL_SUB_BIN_SIZE;
break;
+ CHKSIZE(1);
+ case LOCAL_EXT:
+ /*
+ * Currently the hash is 4 bytes large...
+ */
+ CHKSIZE(4);
+ lext_hash = ep;
+ lext_term_end = terms - 1;
+ terms++;
+ ep += 4;
+ break;
default:
goto error;
}
terms--;
+ if (terms == lext_term_end) {
+ ErtsBlockHashState lext_state_buf, *lext_state;
+ const byte *ep_start = lext_hash + 4 /* 32 bit hash value */;
+ Sint len = ep - ep_start;
+ Uint chunk_len;
+ Uint32 hash;
+
+ ASSERT(lext_hash);
+
+ if (len <= 0) {
+ /* no terms */
+ goto error;
+ }
+
+ lext_state = ctx ? &ctx->u.sc.lext_state : &lext_state_buf;
+
+ erts_block_hash_init(lext_state, ep_start, len, local_node_hash);
+
+ if (!ctx) {
+ /* Do it all at once... */
+ chunk_len = ERTS_UINT_MAX;
+ }
+ else {
+ continue_check_lext:
+ lext_state = &ctx->u.sc.lext_state;
+ /* 'reds' has been scaled up to "bytes per reds"... */
+ chunk_len = (Uint) reds;
+ }
+
+ if (!erts_block_hash(&hash, &chunk_len, lext_state)) {
+ /* yield; more work calculating hash... */
+ ASSERT(ctx);
+ reds = 0;
+ }
+ else {
+ reds -= chunk_len;
+ if (hash != get_int32(lext_hash)) {
+ /*
+ * Hash presented in external format did not match the
+ * calculated hash...
+ */
+ goto error;
+ }
+ lext_hash = NULL;
+ lext_term_end = ~0;
+ }
+
+ }
+
+ ASSERT(terms != ~0);
+
if (ctx && --reds <= 0 && terms != 0) {
ctx->u.sc.heap_size = heap_size;
ctx->u.sc.terms = terms;
ctx->u.sc.ep = ep;
ctx->u.sc.atom_extra_skip = atom_extra_skip;
+ ctx->u.sc.lext_hash = lext_hash;
+ ctx->u.sc.lext_term_end = lext_term_end;
ctx->reds = 0;
return 0;
}
} while (terms != 0);
if (terms == 0) {
+
if (ctx) {
ctx->state = B2TDecodeInit;
ctx->reds = reds;
@@ -5943,118 +6350,6 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob,
return 0;
return reds;
}
-
- if ((~dflags & DFLAG_UNLINK_ID)
- && ep[0] == SMALL_TUPLE_EXT
- && ep[1] == 4
- && ep[2] == SMALL_INTEGER_EXT
- && (ep[3] == DOP_UNLINK_ID_ACK || ep[3] == DOP_UNLINK_ID)) {
-
- if (ep[3] == DOP_UNLINK_ID_ACK) {
- /* Drop DOP_UNLINK_ID_ACK signal... */
- int i;
- for (i = 1; i < ob->eiov->vsize; i++) {
- if (ob->eiov->binv[i])
- driver_free_binary(ob->eiov->binv[i]);
- }
- ob->eiov->vsize = 1;
- ob->eiov->size = 0;
- }
- else {
- Eterm ctl_msg, remote, local, *tp;
- ErtsTranscodeDecodeState tds;
- Uint64 id;
- byte *ptr;
- ASSERT(ep[3] == DOP_UNLINK_ID);
- /*
- * Rewrite the DOP_UNLINK_ID signal into a
- * DOP_UNLINK signal and send an unlink ack
- * to the local sender.
- */
-
- /*
- * decode control message so we get info
- * needed for unlink ack signal to send...
- */
- ASSERT(get_int32(hdr + 4) == 0); /* No payload */
- ctl_msg = transcode_decode_ctl_msg(&tds, iov, eiov->vsize);
-
- ASSERT(is_tuple_arity(ctl_msg, 4));
-
- tp = tuple_val(ctl_msg);
- ASSERT(tp[1] == make_small(DOP_UNLINK_ID));
-
- if (!term_to_Uint64(tp[2], &id))
- ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal");
-
- local = tp[3];
- remote = tp[4];
-
- ASSERT(is_internal_pid(local));
- ASSERT(is_external_pid(remote));
-
- /*
- * Rewrite buffer to an unlink signal by removing
- * second element and change first element to
- * DOP_UNLINK. That is, to: {DOP_UNLINK, local, remote}
- */
-
- ptr = &ep[4];
- switch (*ptr) {
- case SMALL_INTEGER_EXT:
- ptr += 1;
- break;
- case INTEGER_EXT:
- ptr += 4;
- break;
- case SMALL_BIG_EXT:
- ptr += 1;
- ASSERT(*ptr <= 8);
- ptr += *ptr + 1;
- break;
- default:
- ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal");
- break;
- }
-
- ASSERT((ptr - ep) <= 16);
- ASSERT((ptr - ep) <= iov[2].iov_len);
-
- *(ptr--) = DOP_UNLINK;
- *(ptr--) = SMALL_INTEGER_EXT;
- *(ptr--) = 3;
- *ptr = SMALL_TUPLE_EXT;
-
- iov[2].iov_base = ptr;
- iov[2].iov_len -= (ptr - ep);
-
-#ifdef DEBUG
- {
- ErtsTranscodeDecodeState dbg_tds;
- Eterm new_ctl_msg = transcode_decode_ctl_msg(&dbg_tds,
- iov,
- eiov->vsize);
- ASSERT(is_tuple_arity(new_ctl_msg, 3));
- tp = tuple_val(new_ctl_msg);
- ASSERT(tp[1] == make_small(DOP_UNLINK));
- ASSERT(tp[2] == local);
- ASSERT(eq(tp[3], remote));
- transcode_decode_state_destroy(&dbg_tds);
- }
-#endif
-
- /* Send unlink ack to local sender... */
- erts_proc_sig_send_dist_unlink_ack(dep, dep->connection_id,
- remote, local, id);
-
- transcode_decode_state_destroy(&tds);
-
- reds -= 5;
- }
- if (reds < 0)
- return 0;
- return reds;
- }
start_r = r = reds*ERTS_TRANSCODE_REDS_FACT;