diff options
author | Rickard Green <rickard@erlang.org> | 2021-12-06 18:02:02 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2021-12-06 18:02:02 +0100 |
commit | 2a7507389949b3f92355cccf12d3e51612692cb3 (patch) | |
tree | 3fa4483c1c89af5609362dae8f65a3327f71e483 /erts/emulator/beam | |
parent | 3002f55f409f44122d3a45ac53bfd453e9aa2cb2 (diff) | |
parent | 7241a6a3e6940b2efed5fde39f78ef8b69212b97 (diff) | |
download | erlang-2a7507389949b3f92355cccf12d3e51612692cb3.tar.gz |
Merge branch 'rickard/outstanding-cpc-cla-limit/22.3.4/OTP-17796' into rickard/outstanding-cpc-cla-limit/23.3.4/OTP-17796
* rickard/outstanding-cpc-cla-limit/22.3.4/OTP-17796:
Introduce outstanding requests limit for system processes
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 29 | ||||
-rw-r--r-- | erts/emulator/beam/beam_load.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 26 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 44 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 76 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 13 |
9 files changed, 179 insertions, 25 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 65f78f5541..3f6795e75b 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -505,6 +505,7 @@ atom out atom out_exited atom out_exiting atom output +atom outstanding_system_requests_limit atom overlapped_io atom owner atom packet diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 9a3b37982a..467a6c2f4f 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -72,6 +72,8 @@ static void delete_code(Module* modp); static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); static int any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); +static erts_atomic_t sys_proc_outstanding_req_limit; + static void init_purge_state(void) { @@ -100,12 +102,37 @@ static void init_release_literal_areas(void); void -erts_beam_bif_load_init(void) +erts_beam_bif_load_init(Uint sys_proc_outst_req_lim) { + if (sys_proc_outst_req_lim < 1 || ERTS_MAX_PROCESSES < sys_proc_outst_req_lim) + ERTS_INTERNAL_ERROR("invalid system process outstanding requests limit"); + erts_atomic_init_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) sys_proc_outst_req_lim); init_release_literal_areas(); init_purge_state(); } +Uint +erts_set_outstanding_system_requests_limit(Uint new_val) +{ + erts_aint_t old_val; + + if (new_val < 1 || ERTS_MAX_PROCESSES < new_val) + return 0; + + old_val = erts_atomic_xchg_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) new_val); + return (Uint) old_val; +} + +Uint +erts_get_outstanding_system_requests_limit(void) +{ + erts_aint_t val = erts_atomic_read_nob(&sys_proc_outstanding_req_limit); + ASSERT(0 < val && val <= MAX_SMALL); + return (Uint) val; +} + BIF_RETTYPE code_is_module_native_1(BIF_ALIST_1) { Module* modp; diff --git a/erts/emulator/beam/beam_load.h b/erts/emulator/beam/beam_load.h index e7127c5b08..44963ba067 100644 --- a/erts/emulator/beam/beam_load.h +++ b/erts/emulator/beam/beam_load.h @@ -111,7 +111,9 @@ typedef struct beam_code_header { void erts_release_literal_area(struct ErtsLiteralArea_* literal_area); int erts_is_module_native(BeamCodeHeader* code); int erts_is_function_native(ErtsCodeInfo*); -void erts_beam_bif_load_init(void); +void erts_beam_bif_load_init(Uint); +Uint erts_get_outstanding_system_requests_limit(void); +Uint erts_set_outstanding_system_requests_limit(Uint new_val); struct erl_fun_entry; void erts_purge_state_add_fun(struct erl_fun_entry *fe); Export *erts_suspend_process_on_pending_purge_lambda(Process *c_p, diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index eefb722c56..7b549c015a 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -4899,6 +4899,14 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2) threads); } #endif + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val; + if (!term_to_Uint(BIF_ARG_2, &val)) + goto error; + val = erts_set_outstanding_system_requests_limit(val); + if (!val) + goto error; + BIF_RET(make_small(val)); } else if (ERTS_IS_ATOM_STR("scheduling_statistics", BIF_ARG_1)) { int what; if (ERTS_IS_ATOM_STR("disable", BIF_ARG_2)) diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 41c0fb82fa..b23e8e7b16 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2588,6 +2588,9 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) default: ERTS_INTERNAL_ERROR("Invalid time warp mode"); } + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val = erts_get_outstanding_system_requests_limit(); + BIF_RET(make_small(val)); } else if (BIF_ARG_1 == am_allocated_areas) { res = erts_allocated_areas(NULL, NULL, BIF_P); BIF_RET(res); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 8c8e0c9862..0d28ba80f4 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -148,6 +148,7 @@ static void erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -302,6 +303,7 @@ erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -359,7 +361,7 @@ erl_init(int ncpu, erts_init_unicode(); /* after RE to get access to PCRE unicode */ erts_init_external(); erts_init_map(); - erts_beam_bif_load_init(); + erts_beam_bif_load_init(sys_proc_outst_req_lim); erts_delay_trap = erts_export_put(am_erlang, am_delay_trap, 2); erts_late_init_process(); #if HAVE_ERTS_MSEG @@ -713,6 +715,9 @@ void erts_usage(void) erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); erts_fprintf(stderr, " valid values are infinity or intergers in the range [0-%d]\n", ERTS_NODE_TAB_DELAY_GC_MAX); + erts_fprintf(stderr, "-zosrl number set outstanding requests limit for system processes,\n"); + erts_fprintf(stderr, " valid range [1-%d]\n", + ERTS_MAX_PROCESSES); #if 0 erts_fprintf(stderr, "-zebwt val set ets busy wait threshold, valid values are:\n"); erts_fprintf(stderr, " none|very_short|short|medium|long|very_long|extremely_long\n"); @@ -1286,6 +1291,7 @@ erl_start(int argc, char **argv) int port_tab_sz_ignore_files = 0; int legacy_proc_tab = 0; int legacy_port_tab = 0; + Uint sys_proc_outst_req_lim; int time_correction; ErtsTimeWarpMode time_warp_mode; int node_tab_delete_delay = ERTS_NODE_TAB_DELAY_GC_DEFAULT; @@ -1327,6 +1333,8 @@ erl_start(int argc, char **argv) erts_error_logger_warnings = am_warning; + sys_proc_outst_req_lim = 2*erts_no_schedulers; + while (i < argc) { if (argv[i][0] != '-') { erts_usage(); @@ -2215,6 +2223,17 @@ erl_start(int argc, char **argv) erts_usage(); } } + else if (has_prefix("osrl", sub_param)) { + long val; + arg = get_arg(sub_param+4, argv[i+1], &i); + errno = 0; + val = strtol(arg, NULL, 10); + if (errno != 0 || val < 1 || ERTS_MAX_PROCESSES < val) { + erts_fprintf(stderr, "Invalid outstanding requests limit %s\n", arg); + erts_usage(); + } + sys_proc_outst_req_lim = (Uint) val; + } else { erts_fprintf(stderr, "bad -z option %s\n", argv[i]); erts_usage(); @@ -2294,6 +2313,7 @@ erl_start(int argc, char **argv) port_tab_sz, port_tab_sz_ignore_files, legacy_port_tab, + sys_proc_outst_req_lim, time_correction, time_warp_mode, node_tab_delete_delay, @@ -2318,7 +2338,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_code_purger", !0, - PRIORITY_NORMAL); + PRIORITY_HIGH); erts_code_purger = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); @@ -2327,7 +2347,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_literal_area_collector", - !0, PRIORITY_NORMAL); + !0, PRIORITY_HIGH); erts_literal_area_collector = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index ad5610d817..e40fdad3e0 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -767,7 +767,7 @@ void erts_proc_sig_send_pending(ErtsSchedulerData* esdp) } static int -maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) +maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other) { /* * returns: @@ -777,22 +777,29 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) */ int res; Process *rp; - erts_aint32_t state, my_prio, other_prio; rp = erts_proc_lookup_raw(other); if (!rp) res = 0; else { + erts_aint32_t state, min_prio, other_prio; res = -1; - state = erts_atomic32_read_nob(&c_p->state); - my_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + if (prio >= 0) + min_prio = prio; + else { + /* inherit from caller... */ + state = erts_atomic32_read_nob(&c_p->state); + min_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + } + + ASSERT(PRIORITY_MAX <= min_prio && min_prio <= PRIORITY_LOW); state = erts_atomic32_read_nob(&rp->state); other_prio = ERTS_PSFLGS_GET_USR_PRIO(state); - if (other_prio > my_prio) { - /* Others prio is lower than mine; elevate it... */ - res = !!erts_sig_prio(other, my_prio); + if (other_prio > min_prio) { + /* Others prio is lower than min prio; elevate it... */ + res = !!erts_sig_prio(other, min_prio); if (res) { /* ensure handled if dirty executing... */ state = erts_atomic32_read_nob(&rp->state); @@ -802,7 +809,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) * in erl_process.c. */ if (state & ERTS_PSFLG_DIRTY_RUNNING) - erts_make_dirty_proc_handled(other, state, my_prio); + erts_make_dirty_proc_handled(other, state, min_prio); } } } @@ -1764,7 +1771,7 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) destroy_sig_group_leader(sgl); else if (c_p) { erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER; - int prio_res = maybe_elevate_sig_handling_prio(c_p, to); + int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to); if (!prio_res) rm_flags |= ERTS_SIG_GL_FLG_ACTIVE; flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags); @@ -1813,7 +1820,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) { - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); return !0; } else { @@ -1872,7 +1879,7 @@ erts_proc_sig_send_process_info_request(Process *c_p, res = proc_queue_signal(c_p, to, (ErtsSignal *) pis, ERTS_SIG_Q_OP_PROCESS_INFO); if (res) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else erts_free(ERTS_ALC_T_SIG_DATA, pis); return res; @@ -1919,7 +1926,7 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else { Eterm *tp; /* It wasn't alive; reply to ourselves... */ @@ -2037,6 +2044,17 @@ erts_proc_sig_send_rpc_request(Process *c_p, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg) { + return erts_proc_sig_send_rpc_request_prio(c_p, to, reply, func, arg, -1); +} + +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio) +{ Eterm res; ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsProcSigRPC)); @@ -2065,7 +2083,7 @@ erts_proc_sig_send_rpc_request(Process *c_p, } if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, prio, to); else { erts_free(ERTS_ALC_T_SIG_DATA, sig); res = THE_NON_VALUE; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 0b21751e04..0aebf37d00 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -789,6 +789,9 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, * exist. The signal was not sent, and no specific * receive has to be entered by the caller. * + * Minimum priority, that the signal will execute under, + * will equal the priority of the calling process (c_p). + * * @param[in] c_p Pointer to process struct of * currently executing process. * @@ -824,6 +827,79 @@ erts_proc_sig_send_rpc_request(Process *c_p, int reply, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg); +/** + * + * @brief Send an 'rpc' signal to a process. + * + * The function 'func' will be executed in the + * context of the receiving process. A response + * message '{Ref, Result}' is sent to the sender + * when 'func' has been called. 'Ref' is the reference + * returned by this function and 'Result' is the + * term returned by 'func'. If the return value of + * 'func' is not an immediate term, 'func' has to + * allocate a heap fragment where the result is stored + * and update the the heap fragment pointer pointer + * passed as third argument to point to it. + * + * If this function returns a reference, 'func' will + * be called in the context of the receiver. However, + * note that this might happen when the receiver is in + * an exiting state. The caller of this function + * *unconditionally* has to enter a receive that match + * on the returned reference in all clauses as next + * receive; otherwise, bad things will happen! + * + * If THE_NON_VALUE is returned, the receiver did not + * exist. The signal was not sent, and no specific + * receive has to be entered by the caller. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * + * @param[in] to Identifier of receiver process. + * + * @param[in] reply Non-zero if a reply is wanted. + * + * @param[in] func Function to execute in the + * context of the receiver. + * First argument will be a + * pointer to the process struct + * of the receiver process. + * Second argument will be 'arg' + * (see below). Third argument + * will be a pointer to a pointer + * to a heap fragment for storage + * of result returned from 'func' + * (i.e. an 'out' parameter). + * + * @param[in] arg Void pointer to argument + * to pass as second argument + * in call of 'func'. + * + * @param[in] prio Minimum priority that the + * signal will execute under. + * Either PRIORITY_MAX, + * PRIORITY_HIGH, PRIORITY_NORMAL, + * PRIORITY_LOW, or a negative + * value. A negative value will + * cause a minimum priority that + * equals the priority of the + * calling process (c_p). + * + * @returns If the request was sent, + * an internal ordinary + * reference; otherwise, + * THE_NON_VALUE (non-existing + * receiver). + */ +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio); int erts_proc_sig_send_dist_spawn_reply(Eterm node, diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 65a6e1c6ad..feff6ca8c8 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10916,8 +10916,6 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, if (signal) { erts_aint32_t state; - if (priority_req != am_inherit) - goto badarg; state = erts_atomic32_read_acqb(&rp->state); if (state & fail_state & ERTS_PSFLG_EXITING) goto noproc; @@ -10926,11 +10924,12 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, * Send rpc request signal without reply, * and reply from the system task... */ - Eterm res = erts_proc_sig_send_rpc_request(c_p, - target, - 0, /* no reply */ - sched_sig_sys_task, - (void *) st); + Eterm res = erts_proc_sig_send_rpc_request_prio(c_p, + target, + 0, /* no reply */ + sched_sig_sys_task, + (void *) st, + prio); if (is_non_value(res)) goto noproc; return ret; /* signal sent... */ |