diff options
author | Rickard Green <rickard@erlang.org> | 2021-11-24 22:50:13 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2021-12-06 18:01:20 +0100 |
commit | 7241a6a3e6940b2efed5fde39f78ef8b69212b97 (patch) | |
tree | d164c13b060b67fa2137f463ea0431c4dd9b700a /erts/emulator/beam | |
parent | 1a68663cde280a24cee5073f48f3876dfbc3eb75 (diff) | |
download | erlang-7241a6a3e6940b2efed5fde39f78ef8b69212b97.tar.gz |
Introduce outstanding requests limit for system processes
This limit effects system processes that orchestrates operations that
involves all processes in the system. Currently there are two such
processes, the code purger process and the literal area collector
process. They previously sent out requests to all processes on the
system at once, and then waited for responses from these processes in
order to determine when the operation had completed.
When a process had handled such a request it could (and still can)
continue executing Erlang code once it had served this request. If
executing on priority normal or low, it would however not get another
opportunity to execute until all other requests had been served. This
negatively impacted responsiveness of processes during operations like
these on systems with a huge amount of processes.
This change limits the amount of outstanding request which will cause
the system to interleave handling of requests like these with other
execution and by this improve responsiveness on systems with a huge
amount of processes. By default the limit is set to two times the
amount of schedulers. This will make sure that there always will be
enough outstanding requests to utilize all schedulers fully during
operations like these while also letting other work execute without
having to wait very long.
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 29 | ||||
-rw-r--r-- | erts/emulator/beam/beam_load.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 26 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 44 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 76 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 13 |
9 files changed, 179 insertions, 25 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 8753c6e46a..bcf64964ab 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -495,6 +495,7 @@ atom out atom out_exited atom out_exiting atom output +atom outstanding_system_requests_limit atom overlapped_io atom owner atom packet diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 0c95566678..181db30db2 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -72,6 +72,8 @@ static void delete_code(Module* modp); static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); static int any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); +static erts_atomic_t sys_proc_outstanding_req_limit; + static void init_purge_state(void) { @@ -100,12 +102,37 @@ static void init_release_literal_areas(void); void -erts_beam_bif_load_init(void) +erts_beam_bif_load_init(Uint sys_proc_outst_req_lim) { + if (sys_proc_outst_req_lim < 1 || ERTS_MAX_PROCESSES < sys_proc_outst_req_lim) + ERTS_INTERNAL_ERROR("invalid system process outstanding requests limit"); + erts_atomic_init_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) sys_proc_outst_req_lim); init_release_literal_areas(); init_purge_state(); } +Uint +erts_set_outstanding_system_requests_limit(Uint new_val) +{ + erts_aint_t old_val; + + if (new_val < 1 || ERTS_MAX_PROCESSES < new_val) + return 0; + + old_val = erts_atomic_xchg_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) new_val); + return (Uint) old_val; +} + +Uint +erts_get_outstanding_system_requests_limit(void) +{ + erts_aint_t val = erts_atomic_read_nob(&sys_proc_outstanding_req_limit); + ASSERT(0 < val && val <= MAX_SMALL); + return (Uint) val; +} + BIF_RETTYPE code_is_module_native_1(BIF_ALIST_1) { Module* modp; diff --git a/erts/emulator/beam/beam_load.h b/erts/emulator/beam/beam_load.h index 156c3c45e2..d899ccb3e4 100644 --- a/erts/emulator/beam/beam_load.h +++ b/erts/emulator/beam/beam_load.h @@ -111,7 +111,9 @@ typedef struct beam_code_header { void erts_release_literal_area(struct ErtsLiteralArea_* literal_area); int erts_is_module_native(BeamCodeHeader* code); int erts_is_function_native(ErtsCodeInfo*); -void erts_beam_bif_load_init(void); +void erts_beam_bif_load_init(Uint); +Uint erts_get_outstanding_system_requests_limit(void); +Uint erts_set_outstanding_system_requests_limit(Uint new_val); struct erl_fun_entry; void erts_purge_state_add_fun(struct erl_fun_entry *fe); Export *erts_suspend_process_on_pending_purge_lambda(Process *c_p, diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 761b1e4ec2..9914bb48d7 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -4779,6 +4779,14 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2) threads); } #endif + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val; + if (!term_to_Uint(BIF_ARG_2, &val)) + goto error; + val = erts_set_outstanding_system_requests_limit(val); + if (!val) + goto error; + BIF_RET(make_small(val)); } else if (ERTS_IS_ATOM_STR("scheduling_statistics", BIF_ARG_1)) { int what; if (ERTS_IS_ATOM_STR("disable", BIF_ARG_2)) diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 98c9f12bfc..0990c78fee 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2514,6 +2514,9 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) default: ERTS_INTERNAL_ERROR("Invalid time warp mode"); } + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val = erts_get_outstanding_system_requests_limit(); + BIF_RET(make_small(val)); } else if (BIF_ARG_1 == am_allocated_areas) { res = erts_allocated_areas(NULL, NULL, BIF_P); BIF_RET(res); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index f61d265a9a..99ed8da5d6 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -148,6 +148,7 @@ static void erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -302,6 +303,7 @@ erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -359,7 +361,7 @@ erl_init(int ncpu, erts_init_unicode(); /* after RE to get access to PCRE unicode */ erts_init_external(); erts_init_map(); - erts_beam_bif_load_init(); + erts_beam_bif_load_init(sys_proc_outst_req_lim); erts_delay_trap = erts_export_put(am_erlang, am_delay_trap, 2); erts_late_init_process(); #if HAVE_ERTS_MSEG @@ -710,6 +712,9 @@ void erts_usage(void) erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); erts_fprintf(stderr, " valid values are infinity or intergers in the range [0-%d]\n", ERTS_NODE_TAB_DELAY_GC_MAX); + erts_fprintf(stderr, "-zosrl number set outstanding requests limit for system processes,\n"); + erts_fprintf(stderr, " valid range [1-%d]\n", + ERTS_MAX_PROCESSES); #if 0 erts_fprintf(stderr, "-zebwt val set ets busy wait threshold, valid values are:\n"); erts_fprintf(stderr, " none|very_short|short|medium|long|very_long|extremely_long\n"); @@ -1269,6 +1274,7 @@ erl_start(int argc, char **argv) int port_tab_sz_ignore_files = 0; int legacy_proc_tab = 0; int legacy_port_tab = 0; + Uint sys_proc_outst_req_lim; int time_correction; ErtsTimeWarpMode time_warp_mode; int node_tab_delete_delay = ERTS_NODE_TAB_DELAY_GC_DEFAULT; @@ -1310,6 +1316,8 @@ erl_start(int argc, char **argv) erts_error_logger_warnings = am_warning; + sys_proc_outst_req_lim = 2*erts_no_schedulers; + while (i < argc) { if (argv[i][0] != '-') { erts_usage(); @@ -2198,6 +2206,17 @@ erl_start(int argc, char **argv) erts_usage(); } } + else if (has_prefix("osrl", sub_param)) { + long val; + arg = get_arg(sub_param+4, argv[i+1], &i); + errno = 0; + val = strtol(arg, NULL, 10); + if (errno != 0 || val < 1 || ERTS_MAX_PROCESSES < val) { + erts_fprintf(stderr, "Invalid outstanding requests limit %s\n", arg); + erts_usage(); + } + sys_proc_outst_req_lim = (Uint) val; + } else { erts_fprintf(stderr, "bad -z option %s\n", argv[i]); erts_usage(); @@ -2277,6 +2296,7 @@ erl_start(int argc, char **argv) port_tab_sz, port_tab_sz_ignore_files, legacy_port_tab, + sys_proc_outst_req_lim, time_correction, time_warp_mode, node_tab_delete_delay, @@ -2301,7 +2321,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_code_purger", !0, - PRIORITY_NORMAL); + PRIORITY_HIGH); erts_code_purger = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); @@ -2310,7 +2330,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_literal_area_collector", - !0, PRIORITY_NORMAL); + !0, PRIORITY_HIGH); erts_literal_area_collector = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 953efd812e..b92e3586d9 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -735,7 +735,7 @@ void erts_proc_sig_send_pending(ErtsSchedulerData* esdp) } static int -maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) +maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other) { /* * returns: @@ -745,22 +745,29 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) */ int res; Process *rp; - erts_aint32_t state, my_prio, other_prio; rp = erts_proc_lookup_raw(other); if (!rp) res = 0; else { + erts_aint32_t state, min_prio, other_prio; res = -1; - state = erts_atomic32_read_nob(&c_p->state); - my_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + if (prio >= 0) + min_prio = prio; + else { + /* inherit from caller... */ + state = erts_atomic32_read_nob(&c_p->state); + min_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + } + + ASSERT(PRIORITY_MAX <= min_prio && min_prio <= PRIORITY_LOW); state = erts_atomic32_read_nob(&rp->state); other_prio = ERTS_PSFLGS_GET_USR_PRIO(state); - if (other_prio > my_prio) { - /* Others prio is lower than mine; elevate it... */ - res = !!erts_sig_prio(other, my_prio); + if (other_prio > min_prio) { + /* Others prio is lower than min prio; elevate it... */ + res = !!erts_sig_prio(other, min_prio); if (res) { /* ensure handled if dirty executing... */ state = erts_atomic32_read_nob(&rp->state); @@ -770,7 +777,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) * in erl_process.c. */ if (state & ERTS_PSFLG_DIRTY_RUNNING) - erts_make_dirty_proc_handled(other, state, my_prio); + erts_make_dirty_proc_handled(other, state, min_prio); } } } @@ -1586,7 +1593,7 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) destroy_sig_group_leader(sgl); else if (c_p) { erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER; - int prio_res = maybe_elevate_sig_handling_prio(c_p, to); + int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to); if (!prio_res) rm_flags |= ERTS_SIG_GL_FLG_ACTIVE; flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags); @@ -1635,7 +1642,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) { - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); return !0; } else { @@ -1694,7 +1701,7 @@ erts_proc_sig_send_process_info_request(Process *c_p, res = proc_queue_signal(c_p, to, (ErtsSignal *) pis, ERTS_SIG_Q_OP_PROCESS_INFO); if (res) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else erts_free(ERTS_ALC_T_SIG_DATA, pis); return res; @@ -1741,7 +1748,7 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else { Eterm *tp; /* It wasn't alive; reply to ourselves... */ @@ -1761,6 +1768,17 @@ erts_proc_sig_send_rpc_request(Process *c_p, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg) { + return erts_proc_sig_send_rpc_request_prio(c_p, to, reply, func, arg, -1); +} + +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio) +{ Eterm res; ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsProcSigRPC)); @@ -1789,7 +1807,7 @@ erts_proc_sig_send_rpc_request(Process *c_p, } if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, prio, to); else { erts_free(ERTS_ALC_T_SIG_DATA, sig); res = THE_NON_VALUE; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index d45c6af776..04c00eb280 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -678,6 +678,9 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, * exist. The signal was not sent, and no specific * receive has to be entered by the caller. * + * Minimum priority, that the signal will execute under, + * will equal the priority of the calling process (c_p). + * * @param[in] c_p Pointer to process struct of * currently executing process. * @@ -713,6 +716,79 @@ erts_proc_sig_send_rpc_request(Process *c_p, int reply, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg); +/** + * + * @brief Send an 'rpc' signal to a process. + * + * The function 'func' will be executed in the + * context of the receiving process. A response + * message '{Ref, Result}' is sent to the sender + * when 'func' has been called. 'Ref' is the reference + * returned by this function and 'Result' is the + * term returned by 'func'. If the return value of + * 'func' is not an immediate term, 'func' has to + * allocate a heap fragment where the result is stored + * and update the the heap fragment pointer pointer + * passed as third argument to point to it. + * + * If this function returns a reference, 'func' will + * be called in the context of the receiver. However, + * note that this might happen when the receiver is in + * an exiting state. The caller of this function + * *unconditionally* has to enter a receive that match + * on the returned reference in all clauses as next + * receive; otherwise, bad things will happen! + * + * If THE_NON_VALUE is returned, the receiver did not + * exist. The signal was not sent, and no specific + * receive has to be entered by the caller. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * + * @param[in] to Identifier of receiver process. + * + * @param[in] reply Non-zero if a reply is wanted. + * + * @param[in] func Function to execute in the + * context of the receiver. + * First argument will be a + * pointer to the process struct + * of the receiver process. + * Second argument will be 'arg' + * (see below). Third argument + * will be a pointer to a pointer + * to a heap fragment for storage + * of result returned from 'func' + * (i.e. an 'out' parameter). + * + * @param[in] arg Void pointer to argument + * to pass as second argument + * in call of 'func'. + * + * @param[in] prio Minimum priority that the + * signal will execute under. + * Either PRIORITY_MAX, + * PRIORITY_HIGH, PRIORITY_NORMAL, + * PRIORITY_LOW, or a negative + * value. A negative value will + * cause a minimum priority that + * equals the priority of the + * calling process (c_p). + * + * @returns If the request was sent, + * an internal ordinary + * reference; otherwise, + * THE_NON_VALUE (non-existing + * receiver). + */ +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio); /* * End of send operations of currently supported process signals. diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 017845541b..dac6e8fa35 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10914,8 +10914,6 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, if (signal) { erts_aint32_t state; - if (priority_req != am_inherit) - goto badarg; state = erts_atomic32_read_acqb(&rp->state); if (state & fail_state & ERTS_PSFLG_EXITING) goto noproc; @@ -10924,11 +10922,12 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, * Send rpc request signal without reply, * and reply from the system task... */ - Eterm res = erts_proc_sig_send_rpc_request(c_p, - target, - 0, /* no reply */ - sched_sig_sys_task, - (void *) st); + Eterm res = erts_proc_sig_send_rpc_request_prio(c_p, + target, + 0, /* no reply */ + sched_sig_sys_task, + (void *) st, + prio); if (is_non_value(res)) goto noproc; return ret; /* signal sent... */ |