diff options
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r-- | erts/emulator/beam/external.c | 945 |
1 files changed, 620 insertions, 325 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d0a2c61834..081ce23e49 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2022. All Rights Reserved. + * Copyright Ericsson AB 1996-2023. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ #include "erl_proc_sig_queue.h" #include "erl_trace.h" #include "erl_global_literals.h" +#include "erl_term_hashing.h" + #define PASS_THROUGH 'p' @@ -95,8 +97,8 @@ static byte* enc_atom(ErtsAtomCacheMap *, Eterm, byte*, Uint64); static byte* enc_pid(ErtsAtomCacheMap *, Eterm, byte*, Uint64); struct B2TContext_t; static const byte* dec_term(ErtsDistExternal*, ErtsHeapFactory*, const byte*, Eterm*, struct B2TContext_t*, int); -static const byte* dec_atom(ErtsDistExternal *, const byte*, Eterm*); -static const byte* dec_pid(ErtsDistExternal *, ErtsHeapFactory*, const byte*, Eterm*, byte tag); +static const byte* dec_atom(ErtsDistExternal *, const byte*, Eterm*, int); +static const byte* dec_pid(ErtsDistExternal *, ErtsHeapFactory*, const byte*, Eterm*, byte tag, int); static Sint decoded_size(const byte *ep, const byte* endp, int internal_tags, struct B2TContext_t*); static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1); @@ -104,7 +106,6 @@ static Eterm erts_term_to_binary_int(Process* p, Sint bif_ix, Eterm Term, Eterm Uint64 dflags, Binary *context_b, int iovec, Uint fragment_size); -static Uint encode_size_struct2(ErtsAtomCacheMap *, Eterm, Uint64); static ErtsExtSzRes encode_size_struct_int(TTBSizeContext*, ErtsAtomCacheMap *acmp, Eterm obj, Uint64 dflags, Sint *reds, Uint *res); @@ -126,6 +127,72 @@ void erts_init_external(void) { return; } +static Uint32 local_node_hash; + +void erts_late_init_external(void) +{ + char hname[256], pidstr[21]; + size_t hname_len, pidstr_len; + ErtsMonotonicTime mtime, toffs; + ErtsBlockHashState hstate; + byte *lnid; + Uint lnid_ix, chunk_size; + int res; + + res = sys_get_hostname(&hname[0], sizeof(hname)); + if (res == 0) { + hname_len = strlen(hname); + } + else { + hname[0] = '\0'; + hname_len = 0; + } + + sys_get_pid(&pidstr[0], sizeof(pidstr)); + pidstr[20] = '\0'; + + pidstr_len = strlen(pidstr); + + toffs = erts_get_time_offset(); + mtime = erts_get_monotonic_time(NULL); + + lnid = (byte *) erts_alloc(ERTS_ALC_T_TMP, 8 + hname_len + pidstr_len); + + lnid_ix = 0; + + /* time offset... */ + lnid[lnid_ix++] = (byte) toffs & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 8) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 16) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 24) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 32) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 40) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 48) & 0xff; + lnid[lnid_ix++] = (byte) (toffs >> 56) & 0xff; + + /* hostname... */ + + sys_memcpy(&lnid[lnid_ix], &hname[0], hname_len); + + lnid_ix += hname_len; + + /* pid... */ + memcpy(&lnid[lnid_ix], &pidstr[0], pidstr_len); + + /* + * Use least significant 32 bits of monotonic time as initial + * value to hash... + */ + erts_block_hash_init(&hstate, &lnid[0], lnid_ix, + (Uint32) (mtime & 0xffffffff)); + chunk_size = ERTS_UINT_MAX; + res = erts_block_hash(&local_node_hash, &chunk_size, &hstate); + ASSERT(res); (void) res; + ASSERT(chunk_size == lnid_ix); + + erts_free(ERTS_ALC_T_TMP, lnid); +} + #define ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES 255 #define ERTS_DIST_HDR_ATOM_CACHE_FLAG_BYTE_IX(IIX) \ @@ -659,7 +726,7 @@ erts_encode_dist_ext_size(Eterm term, return res; } -ErtsExtSzRes erts_encode_ext_size_2(Eterm term, unsigned dflags, Uint *szp) +ErtsExtSzRes erts_encode_ext_size_2(Eterm term, Uint64 dflags, Uint *szp) { ErtsExtSzRes res; *szp = 0; @@ -675,8 +742,14 @@ ErtsExtSzRes erts_encode_ext_size(Eterm term, Uint *szp) Uint erts_encode_ext_size_ets(Eterm term) { - return encode_size_struct2(NULL, term, - TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED); + ErtsExtSzRes res; + Uint sz = 0; + + res = encode_size_struct_int(NULL, NULL, term, + TERM_TO_BINARY_DFLAGS|DFLAG_ETS_COMPRESSED, + NULL, &sz); + ASSERT(res == ERTS_EXT_SZ_OK); (void) res; + return sz; } @@ -1339,7 +1412,7 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1) 0, 0,bin, 0, ~((Uint) 0)); if (is_non_value(res)) { if (erts_set_gc_state(BIF_P, 1) - || MSO(BIF_P).overhead > BIN_VHEAP_SZ(BIF_P)) { + || MSO(BIF_P).overhead > BIF_P->bin_vheap_sz) { ERTS_VBUMP_ALL_REDS(BIF_P); } if (Opts == am_undefined) @@ -1354,7 +1427,7 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1) BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res); } else { if (erts_set_gc_state(BIF_P, 1) - || MSO(BIF_P).overhead > BIN_VHEAP_SZ(BIF_P)) + || MSO(BIF_P).overhead > BIF_P->bin_vheap_sz) ERTS_BIF_YIELD_RETURN(BIF_P, res); else BIF_RET(res); @@ -1400,12 +1473,12 @@ BIF_RETTYPE term_to_iovec_1(BIF_ALIST_1) } static ERTS_INLINE int -parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep) +parse_t2b_opts(Eterm opts, Uint64 *flagsp, int *levelp, int *iovecp, Uint *fsizep) { int level = 0; int iovec = 0; - Uint flags = TERM_TO_BINARY_DFLAGS; - int deterministic = 0; + Uint64 flags = TERM_TO_BINARY_DFLAGS; + int deterministic = 0, local = 0; Uint fsize = ~((Uint) 0); /* one fragment */ while (is_list(opts)) { @@ -1418,17 +1491,20 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep) iovec = !0; } else if (arg == am_deterministic) { deterministic = 1; + } else if (arg == am_local) { + local = !0; } else if (is_tuple(arg) && *(tp = tuple_val(arg)) == make_arityval(2)) { if (tp[1] == am_minor_version && is_small(tp[2])) { switch (signed_val(tp[2])) { case 0: - flags = TERM_TO_BINARY_DFLAGS & ~DFLAG_NEW_FLOATS; + flags = (TERM_TO_BINARY_DFLAGS + & ~(DFLAG_NEW_FLOATS | DFLAG_UTF8_ATOMS)); break; case 1: /* Current default... */ - flags = TERM_TO_BINARY_DFLAGS; + flags = TERM_TO_BINARY_DFLAGS & ~DFLAG_UTF8_ATOMS; break; case 2: - flags = TERM_TO_BINARY_DFLAGS | DFLAG_UTF8_ATOMS; + flags = TERM_TO_BINARY_DFLAGS; break; default: return 0; /* badarg */ @@ -1459,6 +1535,14 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep) return 0; /* badarg */ } + if (deterministic && local) { + return 0; /* badarg */ + } + + if (local) { + flags |= DFLAG_LOCAL_EXT; + } + if (deterministic) { flags |= DFLAG_DETERMINISTIC; } @@ -1476,7 +1560,7 @@ parse_t2b_opts(Eterm opts, Uint *flagsp, int *levelp, int *iovecp, Uint *fsizep) BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) { int level; - Uint flags; + Uint64 flags; Eterm res; if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) { @@ -1503,7 +1587,7 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) BIF_RETTYPE term_to_iovec_2(BIF_ALIST_2) { int level; - Uint flags; + Uint64 flags; Eterm res; if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) { @@ -1532,7 +1616,7 @@ erts_debug_term_to_binary(Process *p, Eterm term, Eterm opts) { Eterm ret; int level, iovec; - Uint flags; + Uint64 flags; Uint fsize; if (!parse_t2b_opts(opts, &flags, &level, &iovec, &fsize)) { @@ -1579,9 +1663,12 @@ enum B2TState { /* order is somewhat significant */ typedef struct { Sint heap_size; - int terms; - const byte* ep; + Uint32 terms; int atom_extra_skip; + const byte* ep; + ErtsBlockHashState lext_state; + const byte *lext_hash; + Uint32 lext_term_end; } B2TSizeContext; typedef struct { @@ -1590,6 +1677,7 @@ typedef struct { Eterm* next; ErtsHeapFactory factory; int remaining_n; + int internal_nc; char* remaining_bytes; ErtsPStack map_array; } B2TDecodeContext; @@ -1819,7 +1907,6 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) return binary_to_term_int(BIF_P, THE_NON_VALUE, ERTS_MAGIC_BIN_DATA(context_bin)); } - #define B2T_BYTES_PER_REDUCTION 128 #define B2T_MEMCPY_FACTOR 8 @@ -1946,6 +2033,7 @@ static BIF_RETTYPE binary_to_term_int(Process* p, Eterm bin, B2TContext *ctx) ctx->u.dc.ep = ctx->b2ts.extp; ctx->u.dc.res = (Eterm) (UWord) NULL; ctx->u.dc.next = &ctx->u.dc.res; + ctx->u.dc.internal_nc = 0; erts_factory_proc_prealloc_init(&ctx->u.dc.factory, p, ctx->heap_size); ctx->u.dc.map_array.pstart = NULL; ctx->state = B2TDecode; @@ -2117,34 +2205,11 @@ Eterm external_size_2(BIF_ALIST_2) { Uint size = 0; - Uint flags = TERM_TO_BINARY_DFLAGS; - - while (is_list(BIF_ARG_2)) { - Eterm arg = CAR(list_val(BIF_ARG_2)); - Eterm* tp; + int level; + Uint64 flags; - if (is_tuple(arg) && *(tp = tuple_val(arg)) == make_arityval(2)) { - if (tp[1] == am_minor_version && is_small(tp[2])) { - switch (signed_val(tp[2])) { - case 0: - flags &= ~DFLAG_NEW_FLOATS; - break; - case 1: - break; - default: - goto error; - } - } else { - goto error; - } - } else { - error: - BIF_ERROR(BIF_P, BADARG); - } - BIF_ARG_2 = CDR(list_val(BIF_ARG_2)); - } - if (is_not_nil(BIF_ARG_2)) { - goto error; + if (!parse_t2b_opts(BIF_ARG_2, &flags, &level, NULL, NULL)) { + BIF_ERROR(BIF_P, BADARG); } switch (erts_encode_ext_size_2(BIF_ARG_1, flags, &size)) { @@ -2816,45 +2881,80 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint64 dflags) } /* - * We use this atom as sysname in local pid/port/refs - * for the ETS compressed format + * We use INTERNAL_LOCAL_SYSNAME to mark internal node for pid/port/refs + * in the ETS compressed format and local format. * */ -#define INTERNAL_LOCAL_SYSNAME am_ErtsSecretAtom +#define INTERNAL_LOCAL_SYSNAME NIL -static byte* -enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags) +static ERTS_INLINE byte * +enc_internal_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags) { - Uint on, os; - Eterm sysname = ((is_internal_pid(pid) && (dflags & DFLAG_ETS_COMPRESSED)) - ? INTERNAL_LOCAL_SYSNAME : pid_node_name(pid)); - Uint32 creation = pid_creation(pid); + Uint32 number, serial, creation; - *ep++ = NEW_PID_EXT; + ASSERT(is_internal_pid(pid)); - ep = enc_atom(acmp, sysname, ep, dflags); + *ep++ = NEW_PID_EXT; - if (is_internal_pid(pid)) { - on = internal_pid_number(pid); - os = internal_pid_serial(pid); + number = internal_pid_number(pid); + serial = internal_pid_serial(pid); + if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) { + *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */ + creation = 0; } else { - on = external_pid_number(pid); - os = external_pid_serial(pid); + Eterm sysname = internal_pid_node_name(pid); + creation = internal_pid_creation(pid); + ep = enc_atom(acmp, sysname, ep, dflags); } - put_int32(on, ep); + put_int32(number, ep); + ep += 4; + put_int32(serial, ep); + ep += 4; + put_int32(creation, ep); + ep += 4; + return ep; + +} + +static ERTS_INLINE byte * +enc_external_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags) +{ + Uint32 number, serial, creation; + Eterm sysname; + + ASSERT(is_external_pid(pid)); + + *ep++ = NEW_PID_EXT; + + ASSERT(is_external_pid(pid)); + number = external_pid_number(pid); + serial = external_pid_serial(pid); + sysname = external_pid_node_name(pid); + creation = external_pid_creation(pid); + ep = enc_atom(acmp, sysname, ep, dflags); + + put_int32(number, ep); ep += 4; - put_int32(os, ep); + put_int32(serial, ep); ep += 4; put_int32(creation, ep); ep += 4; return ep; } +static byte* +enc_pid(ErtsAtomCacheMap *acmp, Eterm pid, byte* ep, Uint64 dflags) +{ + if (is_internal_pid(pid)) + return enc_internal_pid(acmp, pid, ep, dflags); + return enc_external_pid(acmp, pid, ep, dflags); +} + /* Expect an atom in plain text or cached */ static const byte* -dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp) +dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp, int internal_nc) { Uint len; int n; @@ -2919,7 +3019,12 @@ dec_atom(ErtsDistExternal *edep, const byte* ep, Eterm* objp) } *objp = make_atom(n); break; - + case NIL_EXT: + if (internal_nc) { + *objp = INTERNAL_LOCAL_SYSNAME; + break; + } + /* else: fail... */ default: error: *objp = NIL; /* Don't leave a hole in the heap */ @@ -2947,7 +3052,7 @@ static ERTS_INLINE ErlNode* dec_get_node(Eterm sysname, Uint32 creation, Eterm b static const byte* dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, const byte* ep, - Eterm* objp, byte tag) + Eterm* objp, byte tag, int internal_nc) { Eterm sysname; Uint data; @@ -2958,7 +3063,7 @@ dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, const byte* ep, *objp = NIL; /* In case we fail, don't leave a hole in the heap */ /* eat first atom */ - if ((ep = dec_atom(edep, ep, &sysname)) == NULL) + if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) return NULL; num = get_uint32(ep); ep += 4; @@ -3071,6 +3176,7 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, FloatDef f; register Sint r = 0; int use_iov = 0; + byte *lext_hash = NULL; /* initialize to avoid faulty warning... */ /* The following variables are only used during encoding of * a map when the `deterministic` option is active. */ @@ -3088,10 +3194,39 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, obj = ctx->obj; map_array = ctx->map_array; next_map_element = ctx->next_map_element; + lext_hash = ctx->lext_hash; if (is_non_value(obj)) { goto outer_loop; } + else { + goto L_jump_start; + } } + if (ctx->continue_make_lext_hash) { + lext_hash = ctx->lext_hash; + ep = ctx->ep; + if (use_iov) { + goto continue_make_lext_hash_iov; + } + else { + goto continue_make_lext_hash_bin; + } + } + } + + /* We only pass here once at the start of the encoding... */ + if (dflags & DFLAG_LOCAL_EXT) { + *ep++ = LOCAL_EXT; + lext_hash = ep; + if (ctx) + ctx->lext_hash = ep; + ep += 4; /* 32-bit hash */ + if (use_iov) { + ASSERT(ctx); + store_in_vec(ctx, ep, NULL, THE_NON_VALUE, NULL, 0); + /* current vlen is now where to start calculating the hash */ + ctx->lext_vlen = ctx->vlen; + } } goto L_jump_start; @@ -3210,7 +3345,6 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, long num_reductions = r; n = next_map_element - map_array; - ASSERT(n > MAP_SMALL_MAP_LIMIT); if (ctx == NULL) { /* No context means that the external representation of term * being encoded will fit in a heap binary (64 bytes). This @@ -3375,54 +3509,89 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, } break; - case PID_DEF: case EXTERNAL_PID_DEF: - ep = enc_pid(acmp, obj, ep, dflags); + ep = enc_external_pid(acmp, obj, ep, dflags); + break; + + case PID_DEF: + ep = enc_internal_pid(acmp, obj, ep, dflags); break; - case REF_DEF: case EXTERNAL_REF_DEF: { + Eterm sysname; Uint32 *ref_num; - Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_ref(obj)) - ? INTERNAL_LOCAL_SYSNAME : ref_node_name(obj)); - Uint32 creation = ref_creation(obj); + Uint32 creation; - erts_magic_ref_save_bin(obj); + *ep++ = NEWER_REFERENCE_EXT; + + ref_num = external_ref_numbers(obj); + i = external_ref_no_numbers(obj); + put_int16(i, ep); + ep += 2; + + sysname = external_ref_node_name(obj); + creation = external_ref_creation(obj); + goto ref_common; + + case REF_DEF: *ep++ = NEWER_REFERENCE_EXT; - i = ref_no_numbers(obj); - put_int16(i, ep); - ep += 2; - ep = enc_atom(acmp, sysname, ep, dflags); + + erts_magic_ref_save_bin(obj); + + ref_num = internal_ref_numbers(obj); + i = internal_ref_no_numbers(obj); + put_int16(i, ep); + ep += 2; + + if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) { + *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */ + creation = 0; + } + else { + sysname = internal_ref_node_name(obj); + creation = internal_ref_creation(obj); + ref_common: + ep = enc_atom(acmp, sysname, ep, dflags); + } + put_int32(creation, ep); ep += 4; - ref_num = ref_numbers(obj); for (j = 0; j < i; j++) { put_int32(ref_num[j], ep); ep += 4; } break; } - case PORT_DEF: case EXTERNAL_PORT_DEF: { - Eterm sysname = (((dflags & DFLAG_ETS_COMPRESSED) && is_internal_port(obj)) - ? INTERNAL_LOCAL_SYSNAME : port_node_name(obj)); - Uint32 creation = port_creation(obj); - byte *tagp = ep++; - Uint64 num; - - ep = enc_atom(acmp, sysname, ep, dflags); - num = port_number(obj); - if (num > ERTS_MAX_V3_PORT_NUMBER) { - *tagp = V4_PORT_EXT; - put_int64(num, ep); - ep += 8; - } - else { - *tagp = NEW_PORT_EXT; - put_int32(num, ep); - ep += 4; - } + Eterm sysname; + Uint64 number; + Uint32 creation; + + *ep++ = V4_PORT_EXT; + number = external_port_number(obj); + sysname = external_port_node_name(obj); + creation = external_port_creation(obj); + + goto port_common; + + case PORT_DEF: + + *ep++ = V4_PORT_EXT; + number = internal_port_number(obj); + if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) { + *ep++ = NIL_EXT; /* INTERNAL_LOCAL_NODE */ + creation = 0; + } + else { + sysname = internal_port_node_name(obj); + creation = internal_port_creation(obj); + port_common: + ep = enc_atom(acmp, sysname, ep, dflags); + } + + put_int64(number, ep); + ep += 8; put_int32(creation, ep); ep += 4; break; @@ -3481,9 +3650,20 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, Eterm *kptr = flatmap_get_keys(mp); Eterm *vptr = flatmap_get_values(mp); - WSTACK_PUSH4(s, (UWord)kptr, (UWord)vptr, - ENC_MAP_PAIR, size); - } + if (dflags & DFLAG_DETERMINISTIC) { + ASSERT(map_array == NULL); + next_map_element = map_array = alloc_map_array(size); + while (size--) { + *next_map_element++ = *kptr++; + *next_map_element++ = *vptr++; + } + WSTACK_PUSH2(s, ENC_START_SORTING_MAP, THE_NON_VALUE); + } + else { + WSTACK_PUSH4(s, (UWord)kptr, (UWord)vptr, + ENC_MAP_PAIR, size); + } + } } else { Eterm hdr; Uint node_sz; @@ -3741,6 +3921,68 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, if (use_iov) store_in_vec(ctx, ep, NULL, THE_NON_VALUE, NULL, 0); } + if (dflags & DFLAG_LOCAL_EXT) { + int done; + Uint32 hash; + Uint chunk_len; + + if (use_iov) { + ASSERT(ctx); + erts_iov_block_hash_init(&ctx->lext_state.iov_block, + &ctx->iov[ctx->lext_vlen], + ctx->vlen - ctx->lext_vlen, + local_node_hash); + + continue_make_lext_hash_iov: + /* Do 128 bytes per reduction... */ + chunk_len = (Uint) r*128; + + done = erts_iov_block_hash(&hash, &chunk_len, + &ctx->lext_state.iov_block); + } + else { + ErtsBlockHashState lext_state_buf, *lext_state; + byte *ep_start = lext_hash + 4 /* 32 bit hash value */; + Sint len = ep - ep_start; + + ASSERT(len >= 0); + + lext_state = ctx ? &ctx->lext_state.block : &lext_state_buf; + + erts_block_hash_init(lext_state, ep_start, len, local_node_hash); + + if (!ctx) { + /* Do it all at once... */ + chunk_len = ERTS_UINT_MAX; + } + else { + continue_make_lext_hash_bin: + lext_state = &ctx->lext_state.block; + /* Do 128 bytes per reduction... */ + chunk_len = (Uint) r*128; + } + + done = erts_block_hash(&hash, &chunk_len, lext_state); + } + + if (!ctx) { + ASSERT(done); + } + else { + if (!done) { + /* yield; more work calculating hash... */ + ctx->ep = ep; + ctx->continue_make_lext_hash = !0; + *reds = 0; + return -1; + } + r -= chunk_len/128; + *reds = r; + } + + put_int32(hash, lext_hash); + } + *res = ep; return 0; } @@ -3924,7 +4166,7 @@ dec_term(ErtsDistExternal *edep, { #define PSTACK_TYPE struct dec_term_map PSTACK_DECLARE(map_array, 10); - int n; + int n, internal_nc = ets_decode; ErtsAtomEncoding char_enc; register Eterm* hp; /* Please don't take the address of hp */ Eterm* next; @@ -3938,6 +4180,7 @@ dec_term(ErtsDistExternal *edep, next = ctx->u.dc.next; ep = ctx->u.dc.ep; factory = &ctx->u.dc.factory; + internal_nc = ctx->u.dc.internal_nc; if (ctx->state != B2TDecode) { int n_limit = reds; @@ -4029,6 +4272,8 @@ dec_term(ErtsDistExternal *edep, objp = next; next = (Eterm *) *objp; + continue_this_obj: + switch (*ep++) { case INTEGER_EXT: { @@ -4272,7 +4517,7 @@ dec_term_atom_common: case PID_EXT: case NEW_PID_EXT: factory->hp = hp; - ep = dec_pid(edep, factory, ep, objp, ep[-1]); + ep = dec_pid(edep, factory, ep, objp, ep[-1], internal_nc); hp = factory->hp; if (ep == NULL) { goto error; @@ -4288,7 +4533,7 @@ dec_term_atom_common: Uint32 cre; byte tag = ep[-1]; - if ((ep = dec_atom(edep, ep, &sysname)) == NULL) { + if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) { goto error; } if (tag == V4_PORT_EXT) { @@ -4348,7 +4593,7 @@ dec_term_atom_common: ref_words = 1; - if ((ep = dec_atom(edep, ep, &sysname)) == NULL) + if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) goto error; if ((r0 = get_int32(ep)) >= MAX_REFERENCE ) goto error; @@ -4365,7 +4610,7 @@ dec_term_atom_common: ref_words = get_int16(ep); ep += 2; - if ((ep = dec_atom(edep, ep, &sysname)) == NULL) + if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) goto error; cre = get_int8(ep); @@ -4383,7 +4628,7 @@ dec_term_atom_common: ref_words = get_int16(ep); ep += 2; - if ((ep = dec_atom(edep, ep, &sysname)) == NULL) + if ((ep = dec_atom(edep, ep, &sysname, internal_nc)) == NULL) goto error; cre = get_int32(ep); @@ -4416,7 +4661,7 @@ dec_term_atom_common: } if (ref_words != ERTS_REF_NUMBERS) { int i; - if (ref_words > ERTS_REF_NUMBERS) + if (ref_words > ERTS_MAX_REF_NUMBERS) goto error; /* Not a ref that we created... */ for (i = ref_words; i < ERTS_REF_NUMBERS; i++) ref_num[i] = 0; @@ -4428,12 +4673,15 @@ dec_term_atom_common: } else { /* Check if it is a pid reference... */ - Eterm pid = erts_pid_ref_lookup(ref_num); + Eterm pid = erts_pid_ref_lookup(ref_num, ref_words); if (is_internal_pid(pid)) { write_pid_ref_thing(hp, ref_num[0], ref_num[1], ref_num[2], pid); hp += ERTS_PID_REF_THING_SIZE; } + else if (is_non_value(pid)) { + goto error; /* invalid reference... */ + } else { /* Check if it is a magic reference... */ ErtsMagicBinary *mb = erts_magic_ref_lookup_bin(ref_num); @@ -4618,10 +4866,10 @@ dec_term_atom_common: Eterm temp; Sint arity; - if ((ep = dec_atom(edep, ep, &mod)) == NULL) { + if ((ep = dec_atom(edep, ep, &mod, 0)) == NULL) { goto error; } - if ((ep = dec_atom(edep, ep, &name)) == NULL) { + if ((ep = dec_atom(edep, ep, &name, 0)) == NULL) { goto error; } factory->hp = hp; @@ -4736,7 +4984,7 @@ dec_term_atom_common: *objp = make_fun(funp); /* Module */ - if ((ep = dec_atom(edep, ep, &module)) == NULL) { + if ((ep = dec_atom(edep, ep, &module, 0)) == NULL) { goto error; } factory->hp = hp; @@ -4849,6 +5097,13 @@ dec_term_atom_common: break; } + case LOCAL_EXT: + internal_nc = !0; + if (ctx) + ctx->u.dc.internal_nc = !0; + ep += 4; /* 32-bit hash (verified in decoded_size()) */ + goto continue_this_obj; + default: goto error; } @@ -4949,23 +5204,88 @@ error_map_fixup: return NULL; } -/* returns the number of bytes needed to encode an object - to a sequence of bytes - N.B. That this must agree with to_external2() above!!! - (except for cached atoms) */ -static Uint encode_size_struct2(ErtsAtomCacheMap *acmp, - Eterm obj, - Uint64 dflags) { - Uint size = 0; - ErtsExtSzRes res = encode_size_struct_int(NULL, acmp, obj, - dflags, NULL, - &size); - /* - * encode_size_struct2() only allowed when - * we know the result will always be OK! - */ - ASSERT(res == ERTS_EXT_SZ_OK); (void) res; - return (Uint) size; +static Uint +encode_atom_size(ErtsAtomCacheMap *acmp, Eterm atom, Uint64 dflags) +{ + ASSERT(is_atom(atom)); + if (dflags & DFLAG_ETS_COMPRESSED) { + if (atom_val(atom) >= (1<<16)) { + return (Uint) 1 + 3; + } + else { + return (Uint) 1 + 2; + } + } + else { + Atom *a = atom_tab(atom_val(atom)); + int alen; + Uint result; + if ((dflags & DFLAG_UTF8_ATOMS) || a->latin1_chars < 0) { + alen = a->len; + result = (Uint) 1 + 1 + alen; + if (alen > 255) { + result++; /* ATOM_UTF8_EXT (not small) */ + } + } + else { + alen = a->latin1_chars; + result = (Uint) 1 + 1 + alen; + if (alen > 255 || !(dflags & DFLAG_SMALL_ATOM_TAGS)) + result++; /* ATOM_EXT (not small) */ + } + insert_acache_map(acmp, atom, dflags); + return result; + } +} + +static Uint +encode_internal_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags) +{ + int nlen; + ASSERT(is_internal_pid(pid)); + nlen = ((dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) + ? 1 + : encode_atom_size(acmp, internal_pid_node_name(pid), dflags)); + return (Uint) 1 + nlen + 4 + 4 + 4; +} + +static Uint +encode_external_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags) +{ + int nlen; + ASSERT(is_external_pid(pid)); + nlen = encode_atom_size(acmp, external_pid_node_name(pid), dflags); + return (Uint) 1 + nlen + 4 + 4 + 4; +} + +static Uint +encode_pid_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags) +{ + if (is_internal_pid(pid)) + return encode_internal_pid_size(acmp, pid, dflags); + ASSERT(is_external_pid(pid)); + return encode_external_pid_size(acmp, pid, dflags); +} + +static Uint +encode_small_size(ErtsAtomCacheMap *acmp, Eterm pid, Uint64 dflags) +{ + Sint val = signed_val(pid); + Uint result; + + if ((Uint)val < 256) + result = (Uint) 1 + 1; /* SMALL_INTEGER_EXT */ + else if (sizeof(Sint) == 4 || IS_SSMALL32(val)) + result = (Uint) 1 + 4; /* INTEGER_EXT */ + else { + int i; + DeclareTmpHeapNoproc(tmp_big,2); + UseTmpHeapNoproc(2); + i = big_bytes(small_to_big(val, tmp_big)); + result = (Uint) 1 + 1 + 1 + i; /* SMALL_BIG_EXT */ + UnUseTmpHeapNoproc(2); + } + return result; } static ErtsExtSzRes @@ -4978,14 +5298,24 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, Sint r = 0; int vlen = -1; - if (ctx) { + if (!ctx) { + if (dflags & DFLAG_LOCAL_EXT) + result += 5; + } + else { WSTACK_CHANGE_ALLOCATOR(s, ERTS_ALC_T_SAVED_ESTACK); r = *reds; vlen = ctx->vlen; - if (!ctx->wstack.wstart) + if (!ctx->wstack.wstart) { ctx->last_result = result; + if (dflags & DFLAG_LOCAL_EXT) { + result += 5; + if (vlen >= 0) + vlen++; + } + } else { /* restore saved stack */ WSTACK_RESTORE(s, &ctx->wstack); result = ctx->result; @@ -5014,49 +5344,10 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, result++; break; case ATOM_DEF: - if (dflags & DFLAG_ETS_COMPRESSED) { - if (atom_val(obj) >= (1<<16)) { - result += 1 + 3; - } - else { - result += 1 + 2; - } - } - else { - Atom *a = atom_tab(atom_val(obj)); - int alen; - if ((dflags & DFLAG_UTF8_ATOMS) || a->latin1_chars < 0) { - alen = a->len; - result += 1 + 1 + alen; - if (alen > 255) { - result++; /* ATOM_UTF8_EXT (not small) */ - } - } - else { - alen = a->latin1_chars; - result += 1 + 1 + alen; - if (alen > 255 || !(dflags & DFLAG_SMALL_ATOM_TAGS)) - result++; /* ATOM_EXT (not small) */ - } - insert_acache_map(acmp, obj, dflags); - } + result += encode_atom_size(acmp, obj, dflags); break; case SMALL_DEF: - { - Sint val = signed_val(obj); - - if ((Uint)val < 256) - result += 1 + 1; /* SMALL_INTEGER_EXT */ - else if (sizeof(Sint) == 4 || IS_SSMALL32(val)) - result += 1 + 4; /* INTEGER_EXT */ - else { - DeclareTmpHeapNoproc(tmp_big,2); - UseTmpHeapNoproc(2); - i = big_bytes(small_to_big(val, tmp_big)); - result += 1 + 1 + 1 + i; /* SMALL_BIG_EXT */ - UnUseTmpHeapNoproc(2); - } - } + result += encode_small_size(acmp, obj, dflags); break; case BIG_DEF: i = big_bytes(obj); @@ -5068,22 +5359,47 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, result += 1 + 4 + 1 + i; /* tag,size,sign,digits */ break; case EXTERNAL_PID_DEF: + result += encode_external_pid_size(acmp, obj, dflags); + break; case PID_DEF: - result += (1 + encode_size_struct2(acmp, pid_node_name(obj), dflags) + - 4 + 4 + 4); + result += encode_internal_pid_size(acmp, obj, dflags); + break; + case EXTERNAL_REF_DEF: { + int nlen = encode_atom_size(acmp, + external_ref_node_name(obj), + dflags); + i = external_ref_no_numbers(obj); + result += (1 + 2 + nlen + 4 + 4*i); break; - case EXTERNAL_REF_DEF: - case REF_DEF: - i = ref_no_numbers(obj); - result += (1 + 2 + encode_size_struct2(acmp, ref_node_name(obj), dflags) + - 4 + 4*i); + } + case REF_DEF: { + int nlen; + i = internal_ref_no_numbers(obj); + if (dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) { + nlen = 1; + } + else { + nlen = encode_atom_size(acmp, + internal_ref_node_name(obj), + dflags); + } + result += (1 + 2 + nlen + 4 + 4*i); + break; + } + case EXTERNAL_PORT_DEF: { + int nlen = encode_atom_size(acmp, + external_port_node_name(obj), + dflags); + result += (1 + nlen + 8 + 4); break; - case EXTERNAL_PORT_DEF: + } case PORT_DEF: { - Uint64 num = port_number(obj); - result += (num > ERTS_MAX_V3_PORT_NUMBER) ? 8 : 4; - result += (1 + encode_size_struct2(acmp, port_node_name(obj), dflags) - /* num */ + 4); + int nlen = ((dflags & (DFLAG_ETS_COMPRESSED|DFLAG_LOCAL_EXT)) + ? 1 + : encode_atom_size(acmp, + internal_port_node_name(obj), + dflags)); + result += (1 + nlen + 8 + 4); break; } case LIST_DEF: { @@ -5270,8 +5586,8 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, if (is_local_fun(funp)) { result += 20+1+1+4; /* New ID + Tag */ result += 4; /* Length field (number of free variables */ - result += encode_size_struct2(acmp, funp->creator, dflags); - result += encode_size_struct2(acmp, funp->entry.fun->module, dflags); + result += encode_pid_size(acmp, funp->creator, dflags); + result += encode_atom_size(acmp, funp->entry.fun->module, dflags); result += 2 * (1+4); /* Index, Uniq */ if (funp->num_free > 1) { WSTACK_PUSH2(s, (UWord) (funp->env + 1), @@ -5287,9 +5603,9 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, ASSERT(is_external_fun(funp) && funp->next == NULL); result += 1; - result += encode_size_struct2(acmp, ep->info.mfa.module, dflags); - result += encode_size_struct2(acmp, ep->info.mfa.function, dflags); - result += encode_size_struct2(acmp, make_small(ep->info.mfa.arity), dflags); + result += encode_atom_size(acmp, ep->info.mfa.module, dflags); + result += encode_atom_size(acmp, ep->info.mfa.function, dflags); + result += encode_small_size(acmp, make_small(ep->info.mfa.arity), dflags); } break; } @@ -5345,8 +5661,6 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, return ERTS_EXT_SZ_OK; } - - static Sint decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ctx) { @@ -5354,6 +5668,8 @@ decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ct int atom_extra_skip; Uint n; SWord reds; + const byte *lext_hash; + Uint32 lext_term_end; /* Keep track of the current number of sub terms remaining to be decoded. * @@ -5366,24 +5682,6 @@ decoded_size(const byte *ep, const byte* endp, int internal_tags, B2TContext* ct */ Uint32 terms; - if (ctx) { - reds = ctx->reds; - if (ctx->u.sc.ep) { - heap_size = ctx->u.sc.heap_size; - terms = ctx->u.sc.terms; - ep = ctx->u.sc.ep; - atom_extra_skip = ctx->u.sc.atom_extra_skip; - goto init_done; - } - } - else - ERTS_UNDEF(reds, 0); - - heap_size = 0; - terms = 1; - atom_extra_skip = 0; -init_done: - #define SKIP(sz) \ do { \ if ((sz) <= endp-ep) { \ @@ -5413,6 +5711,38 @@ init_done: if (terms < before) goto error; \ } while (0) + + if (ctx) { + reds = ctx->reds; + if (ctx->u.sc.ep) { + ep = ctx->u.sc.ep; + atom_extra_skip = ctx->u.sc.atom_extra_skip; + heap_size = ctx->u.sc.heap_size; + lext_hash = ctx->u.sc.lext_hash; + lext_term_end = ctx->u.sc.lext_term_end; + terms = ctx->u.sc.terms; + if (terms == lext_term_end) { + ASSERT(lext_hash); + goto continue_check_lext; + } + goto init_done; + } + } + else + ERTS_UNDEF(reds, 0); + + heap_size = 0; + terms = 1; + atom_extra_skip = 0; + /* + * lext_hash != NULL and local_term_end != ~0 when decoding local external + * term format... + */ + lext_hash = NULL; + lext_term_end = ~0; + +init_done: + ASSERT(terms > 0); do { int tag; @@ -5541,6 +5871,18 @@ init_done: terms++; break; case NIL_EXT: + if (atom_extra_skip) { + /* + * atom_extra_skip != 0 should only appear due to local encoding, + * or compressed ets encoding, of a node name in internal + * pids/ports/refs. If not currently inside a local encoding, + * this is an error... + */ + if (!lext_hash && !internal_tags) + goto error; + SKIP(atom_extra_skip); + atom_extra_skip = 0; + } break; case LIST_EXT: CHKSIZE(4); @@ -5687,22 +6029,87 @@ init_done: SKIP(2+sizeof(ProcBin)); heap_size += PROC_BIN_SIZE + ERL_SUB_BIN_SIZE; break; + CHKSIZE(1); + case LOCAL_EXT: + /* + * Currently the hash is 4 bytes large... + */ + CHKSIZE(4); + lext_hash = ep; + lext_term_end = terms - 1; + terms++; + ep += 4; + break; default: goto error; } terms--; + if (terms == lext_term_end) { + ErtsBlockHashState lext_state_buf, *lext_state; + const byte *ep_start = lext_hash + 4 /* 32 bit hash value */; + Sint len = ep - ep_start; + Uint chunk_len; + Uint32 hash; + + ASSERT(lext_hash); + + if (len <= 0) { + /* no terms */ + goto error; + } + + lext_state = ctx ? &ctx->u.sc.lext_state : &lext_state_buf; + + erts_block_hash_init(lext_state, ep_start, len, local_node_hash); + + if (!ctx) { + /* Do it all at once... */ + chunk_len = ERTS_UINT_MAX; + } + else { + continue_check_lext: + lext_state = &ctx->u.sc.lext_state; + /* 'reds' has been scaled up to "bytes per reds"... */ + chunk_len = (Uint) reds; + } + + if (!erts_block_hash(&hash, &chunk_len, lext_state)) { + /* yield; more work calculating hash... */ + ASSERT(ctx); + reds = 0; + } + else { + reds -= chunk_len; + if (hash != get_int32(lext_hash)) { + /* + * Hash presented in external format did not match the + * calculated hash... + */ + goto error; + } + lext_hash = NULL; + lext_term_end = ~0; + } + + } + + ASSERT(terms != ~0); + if (ctx && --reds <= 0 && terms != 0) { ctx->u.sc.heap_size = heap_size; ctx->u.sc.terms = terms; ctx->u.sc.ep = ep; ctx->u.sc.atom_extra_skip = atom_extra_skip; + ctx->u.sc.lext_hash = lext_hash; + ctx->u.sc.lext_term_end = lext_term_end; ctx->reds = 0; return 0; } } while (terms != 0); if (terms == 0) { + if (ctx) { ctx->state = B2TDecodeInit; ctx->reds = reds; @@ -5943,118 +6350,6 @@ Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, return 0; return reds; } - - if ((~dflags & DFLAG_UNLINK_ID) - && ep[0] == SMALL_TUPLE_EXT - && ep[1] == 4 - && ep[2] == SMALL_INTEGER_EXT - && (ep[3] == DOP_UNLINK_ID_ACK || ep[3] == DOP_UNLINK_ID)) { - - if (ep[3] == DOP_UNLINK_ID_ACK) { - /* Drop DOP_UNLINK_ID_ACK signal... */ - int i; - for (i = 1; i < ob->eiov->vsize; i++) { - if (ob->eiov->binv[i]) - driver_free_binary(ob->eiov->binv[i]); - } - ob->eiov->vsize = 1; - ob->eiov->size = 0; - } - else { - Eterm ctl_msg, remote, local, *tp; - ErtsTranscodeDecodeState tds; - Uint64 id; - byte *ptr; - ASSERT(ep[3] == DOP_UNLINK_ID); - /* - * Rewrite the DOP_UNLINK_ID signal into a - * DOP_UNLINK signal and send an unlink ack - * to the local sender. - */ - - /* - * decode control message so we get info - * needed for unlink ack signal to send... - */ - ASSERT(get_int32(hdr + 4) == 0); /* No payload */ - ctl_msg = transcode_decode_ctl_msg(&tds, iov, eiov->vsize); - - ASSERT(is_tuple_arity(ctl_msg, 4)); - - tp = tuple_val(ctl_msg); - ASSERT(tp[1] == make_small(DOP_UNLINK_ID)); - - if (!term_to_Uint64(tp[2], &id)) - ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal"); - - local = tp[3]; - remote = tp[4]; - - ASSERT(is_internal_pid(local)); - ASSERT(is_external_pid(remote)); - - /* - * Rewrite buffer to an unlink signal by removing - * second element and change first element to - * DOP_UNLINK. That is, to: {DOP_UNLINK, local, remote} - */ - - ptr = &ep[4]; - switch (*ptr) { - case SMALL_INTEGER_EXT: - ptr += 1; - break; - case INTEGER_EXT: - ptr += 4; - break; - case SMALL_BIG_EXT: - ptr += 1; - ASSERT(*ptr <= 8); - ptr += *ptr + 1; - break; - default: - ERTS_INTERNAL_ERROR("Invalid encoding of DOP_UNLINK_ID signal"); - break; - } - - ASSERT((ptr - ep) <= 16); - ASSERT((ptr - ep) <= iov[2].iov_len); - - *(ptr--) = DOP_UNLINK; - *(ptr--) = SMALL_INTEGER_EXT; - *(ptr--) = 3; - *ptr = SMALL_TUPLE_EXT; - - iov[2].iov_base = ptr; - iov[2].iov_len -= (ptr - ep); - -#ifdef DEBUG - { - ErtsTranscodeDecodeState dbg_tds; - Eterm new_ctl_msg = transcode_decode_ctl_msg(&dbg_tds, - iov, - eiov->vsize); - ASSERT(is_tuple_arity(new_ctl_msg, 3)); - tp = tuple_val(new_ctl_msg); - ASSERT(tp[1] == make_small(DOP_UNLINK)); - ASSERT(tp[2] == local); - ASSERT(eq(tp[3], remote)); - transcode_decode_state_destroy(&dbg_tds); - } -#endif - - /* Send unlink ack to local sender... */ - erts_proc_sig_send_dist_unlink_ack(dep, dep->connection_id, - remote, local, id); - - transcode_decode_state_destroy(&tds); - - reds -= 5; - } - if (reds < 0) - return 0; - return reds; - } start_r = r = reds*ERTS_TRANSCODE_REDS_FACT; |