summaryrefslogtreecommitdiff
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/beam_bif_load.c29
-rw-r--r--erts/emulator/beam/beam_load.h4
-rw-r--r--erts/emulator/beam/bif.c8
-rw-r--r--erts/emulator/beam/erl_bif_info.c3
-rw-r--r--erts/emulator/beam/erl_init.c26
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c44
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h76
-rw-r--r--erts/emulator/beam/erl_process.c13
9 files changed, 179 insertions, 25 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 65f78f5541..3f6795e75b 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -505,6 +505,7 @@ atom out
atom out_exited
atom out_exiting
atom output
+atom outstanding_system_requests_limit
atom overlapped_io
atom owner
atom packet
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index 9a3b37982a..467a6c2f4f 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -72,6 +72,8 @@ static void delete_code(Module* modp);
static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size);
static int any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size);
+static erts_atomic_t sys_proc_outstanding_req_limit;
+
static void
init_purge_state(void)
{
@@ -100,12 +102,37 @@ static void
init_release_literal_areas(void);
void
-erts_beam_bif_load_init(void)
+erts_beam_bif_load_init(Uint sys_proc_outst_req_lim)
{
+ if (sys_proc_outst_req_lim < 1 || ERTS_MAX_PROCESSES < sys_proc_outst_req_lim)
+ ERTS_INTERNAL_ERROR("invalid system process outstanding requests limit");
+ erts_atomic_init_nob(&sys_proc_outstanding_req_limit,
+ (erts_aint_t) sys_proc_outst_req_lim);
init_release_literal_areas();
init_purge_state();
}
+Uint
+erts_set_outstanding_system_requests_limit(Uint new_val)
+{
+ erts_aint_t old_val;
+
+ if (new_val < 1 || ERTS_MAX_PROCESSES < new_val)
+ return 0;
+
+ old_val = erts_atomic_xchg_nob(&sys_proc_outstanding_req_limit,
+ (erts_aint_t) new_val);
+ return (Uint) old_val;
+}
+
+Uint
+erts_get_outstanding_system_requests_limit(void)
+{
+ erts_aint_t val = erts_atomic_read_nob(&sys_proc_outstanding_req_limit);
+ ASSERT(0 < val && val <= MAX_SMALL);
+ return (Uint) val;
+}
+
BIF_RETTYPE code_is_module_native_1(BIF_ALIST_1)
{
Module* modp;
diff --git a/erts/emulator/beam/beam_load.h b/erts/emulator/beam/beam_load.h
index e7127c5b08..44963ba067 100644
--- a/erts/emulator/beam/beam_load.h
+++ b/erts/emulator/beam/beam_load.h
@@ -111,7 +111,9 @@ typedef struct beam_code_header {
void erts_release_literal_area(struct ErtsLiteralArea_* literal_area);
int erts_is_module_native(BeamCodeHeader* code);
int erts_is_function_native(ErtsCodeInfo*);
-void erts_beam_bif_load_init(void);
+void erts_beam_bif_load_init(Uint);
+Uint erts_get_outstanding_system_requests_limit(void);
+Uint erts_set_outstanding_system_requests_limit(Uint new_val);
struct erl_fun_entry;
void erts_purge_state_add_fun(struct erl_fun_entry *fe);
Export *erts_suspend_process_on_pending_purge_lambda(Process *c_p,
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index eefb722c56..7b549c015a 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -4899,6 +4899,14 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2)
threads);
}
#endif
+ } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) {
+ Uint val;
+ if (!term_to_Uint(BIF_ARG_2, &val))
+ goto error;
+ val = erts_set_outstanding_system_requests_limit(val);
+ if (!val)
+ goto error;
+ BIF_RET(make_small(val));
} else if (ERTS_IS_ATOM_STR("scheduling_statistics", BIF_ARG_1)) {
int what;
if (ERTS_IS_ATOM_STR("disable", BIF_ARG_2))
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 41c0fb82fa..b23e8e7b16 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -2588,6 +2588,9 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
default:
ERTS_INTERNAL_ERROR("Invalid time warp mode");
}
+ } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) {
+ Uint val = erts_get_outstanding_system_requests_limit();
+ BIF_RET(make_small(val));
} else if (BIF_ARG_1 == am_allocated_areas) {
res = erts_allocated_areas(NULL, NULL, BIF_P);
BIF_RET(res);
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index 8c8e0c9862..0d28ba80f4 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -148,6 +148,7 @@ static void erl_init(int ncpu,
int port_tab_sz,
int port_tab_sz_ignore_files,
int legacy_port_tab,
+ Uint sys_proc_outst_req_lim,
int time_correction,
ErtsTimeWarpMode time_warp_mode,
int node_tab_delete_delay,
@@ -302,6 +303,7 @@ erl_init(int ncpu,
int port_tab_sz,
int port_tab_sz_ignore_files,
int legacy_port_tab,
+ Uint sys_proc_outst_req_lim,
int time_correction,
ErtsTimeWarpMode time_warp_mode,
int node_tab_delete_delay,
@@ -359,7 +361,7 @@ erl_init(int ncpu,
erts_init_unicode(); /* after RE to get access to PCRE unicode */
erts_init_external();
erts_init_map();
- erts_beam_bif_load_init();
+ erts_beam_bif_load_init(sys_proc_outst_req_lim);
erts_delay_trap = erts_export_put(am_erlang, am_delay_trap, 2);
erts_late_init_process();
#if HAVE_ERTS_MSEG
@@ -713,6 +715,9 @@ void erts_usage(void)
erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n");
erts_fprintf(stderr, " valid values are infinity or intergers in the range [0-%d]\n",
ERTS_NODE_TAB_DELAY_GC_MAX);
+ erts_fprintf(stderr, "-zosrl number set outstanding requests limit for system processes,\n");
+ erts_fprintf(stderr, " valid range [1-%d]\n",
+ ERTS_MAX_PROCESSES);
#if 0
erts_fprintf(stderr, "-zebwt val set ets busy wait threshold, valid values are:\n");
erts_fprintf(stderr, " none|very_short|short|medium|long|very_long|extremely_long\n");
@@ -1286,6 +1291,7 @@ erl_start(int argc, char **argv)
int port_tab_sz_ignore_files = 0;
int legacy_proc_tab = 0;
int legacy_port_tab = 0;
+ Uint sys_proc_outst_req_lim;
int time_correction;
ErtsTimeWarpMode time_warp_mode;
int node_tab_delete_delay = ERTS_NODE_TAB_DELAY_GC_DEFAULT;
@@ -1327,6 +1333,8 @@ erl_start(int argc, char **argv)
erts_error_logger_warnings = am_warning;
+ sys_proc_outst_req_lim = 2*erts_no_schedulers;
+
while (i < argc) {
if (argv[i][0] != '-') {
erts_usage();
@@ -2215,6 +2223,17 @@ erl_start(int argc, char **argv)
erts_usage();
}
}
+ else if (has_prefix("osrl", sub_param)) {
+ long val;
+ arg = get_arg(sub_param+4, argv[i+1], &i);
+ errno = 0;
+ val = strtol(arg, NULL, 10);
+ if (errno != 0 || val < 1 || ERTS_MAX_PROCESSES < val) {
+ erts_fprintf(stderr, "Invalid outstanding requests limit %s\n", arg);
+ erts_usage();
+ }
+ sys_proc_outst_req_lim = (Uint) val;
+ }
else {
erts_fprintf(stderr, "bad -z option %s\n", argv[i]);
erts_usage();
@@ -2294,6 +2313,7 @@ erl_start(int argc, char **argv)
port_tab_sz,
port_tab_sz_ignore_files,
legacy_port_tab,
+ sys_proc_outst_req_lim,
time_correction,
time_warp_mode,
node_tab_delete_delay,
@@ -2318,7 +2338,7 @@ erl_start(int argc, char **argv)
pid = erl_system_process_otp(erts_init_process_id,
"erts_code_purger", !0,
- PRIORITY_NORMAL);
+ PRIORITY_HIGH);
erts_code_purger
= (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc,
internal_pid_index(pid));
@@ -2327,7 +2347,7 @@ erl_start(int argc, char **argv)
pid = erl_system_process_otp(erts_init_process_id,
"erts_literal_area_collector",
- !0, PRIORITY_NORMAL);
+ !0, PRIORITY_HIGH);
erts_literal_area_collector
= (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc,
internal_pid_index(pid));
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index ad5610d817..e40fdad3e0 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -767,7 +767,7 @@ void erts_proc_sig_send_pending(ErtsSchedulerData* esdp)
}
static int
-maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
+maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other)
{
/*
* returns:
@@ -777,22 +777,29 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
*/
int res;
Process *rp;
- erts_aint32_t state, my_prio, other_prio;
rp = erts_proc_lookup_raw(other);
if (!rp)
res = 0;
else {
+ erts_aint32_t state, min_prio, other_prio;
res = -1;
- state = erts_atomic32_read_nob(&c_p->state);
- my_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+ if (prio >= 0)
+ min_prio = prio;
+ else {
+ /* inherit from caller... */
+ state = erts_atomic32_read_nob(&c_p->state);
+ min_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+ }
+
+ ASSERT(PRIORITY_MAX <= min_prio && min_prio <= PRIORITY_LOW);
state = erts_atomic32_read_nob(&rp->state);
other_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
- if (other_prio > my_prio) {
- /* Others prio is lower than mine; elevate it... */
- res = !!erts_sig_prio(other, my_prio);
+ if (other_prio > min_prio) {
+ /* Others prio is lower than min prio; elevate it... */
+ res = !!erts_sig_prio(other, min_prio);
if (res) {
/* ensure handled if dirty executing... */
state = erts_atomic32_read_nob(&rp->state);
@@ -802,7 +809,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
* in erl_process.c.
*/
if (state & ERTS_PSFLG_DIRTY_RUNNING)
- erts_make_dirty_proc_handled(other, state, my_prio);
+ erts_make_dirty_proc_handled(other, state, min_prio);
}
}
}
@@ -1764,7 +1771,7 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref)
destroy_sig_group_leader(sgl);
else if (c_p) {
erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER;
- int prio_res = maybe_elevate_sig_handling_prio(c_p, to);
+ int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to);
if (!prio_res)
rm_flags |= ERTS_SIG_GL_FLG_ACTIVE;
flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags);
@@ -1813,7 +1820,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) {
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
return !0;
}
else {
@@ -1872,7 +1879,7 @@ erts_proc_sig_send_process_info_request(Process *c_p,
res = proc_queue_signal(c_p, to, (ErtsSignal *) pis,
ERTS_SIG_Q_OP_PROCESS_INFO);
if (res)
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
else
erts_free(ERTS_ALC_T_SIG_DATA, pis);
return res;
@@ -1919,7 +1926,7 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
else {
Eterm *tp;
/* It wasn't alive; reply to ourselves... */
@@ -2037,6 +2044,17 @@ erts_proc_sig_send_rpc_request(Process *c_p,
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
void *arg)
{
+ return erts_proc_sig_send_rpc_request_prio(c_p, to, reply, func, arg, -1);
+}
+
+Eterm
+erts_proc_sig_send_rpc_request_prio(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg,
+ int prio)
+{
Eterm res;
ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA,
sizeof(ErtsProcSigRPC));
@@ -2065,7 +2083,7 @@ erts_proc_sig_send_rpc_request(Process *c_p,
}
if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC))
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, prio, to);
else {
erts_free(ERTS_ALC_T_SIG_DATA, sig);
res = THE_NON_VALUE;
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 0b21751e04..0aebf37d00 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -789,6 +789,9 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to,
* exist. The signal was not sent, and no specific
* receive has to be entered by the caller.
*
+ * Minimum priority, that the signal will execute under,
+ * will equal the priority of the calling process (c_p).
+ *
* @param[in] c_p Pointer to process struct of
* currently executing process.
*
@@ -824,6 +827,79 @@ erts_proc_sig_send_rpc_request(Process *c_p,
int reply,
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
void *arg);
+/**
+ *
+ * @brief Send an 'rpc' signal to a process.
+ *
+ * The function 'func' will be executed in the
+ * context of the receiving process. A response
+ * message '{Ref, Result}' is sent to the sender
+ * when 'func' has been called. 'Ref' is the reference
+ * returned by this function and 'Result' is the
+ * term returned by 'func'. If the return value of
+ * 'func' is not an immediate term, 'func' has to
+ * allocate a heap fragment where the result is stored
+ * and update the the heap fragment pointer pointer
+ * passed as third argument to point to it.
+ *
+ * If this function returns a reference, 'func' will
+ * be called in the context of the receiver. However,
+ * note that this might happen when the receiver is in
+ * an exiting state. The caller of this function
+ * *unconditionally* has to enter a receive that match
+ * on the returned reference in all clauses as next
+ * receive; otherwise, bad things will happen!
+ *
+ * If THE_NON_VALUE is returned, the receiver did not
+ * exist. The signal was not sent, and no specific
+ * receive has to be entered by the caller.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ *
+ * @param[in] to Identifier of receiver process.
+ *
+ * @param[in] reply Non-zero if a reply is wanted.
+ *
+ * @param[in] func Function to execute in the
+ * context of the receiver.
+ * First argument will be a
+ * pointer to the process struct
+ * of the receiver process.
+ * Second argument will be 'arg'
+ * (see below). Third argument
+ * will be a pointer to a pointer
+ * to a heap fragment for storage
+ * of result returned from 'func'
+ * (i.e. an 'out' parameter).
+ *
+ * @param[in] arg Void pointer to argument
+ * to pass as second argument
+ * in call of 'func'.
+ *
+ * @param[in] prio Minimum priority that the
+ * signal will execute under.
+ * Either PRIORITY_MAX,
+ * PRIORITY_HIGH, PRIORITY_NORMAL,
+ * PRIORITY_LOW, or a negative
+ * value. A negative value will
+ * cause a minimum priority that
+ * equals the priority of the
+ * calling process (c_p).
+ *
+ * @returns If the request was sent,
+ * an internal ordinary
+ * reference; otherwise,
+ * THE_NON_VALUE (non-existing
+ * receiver).
+ */
+Eterm
+erts_proc_sig_send_rpc_request_prio(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg,
+ int prio);
int
erts_proc_sig_send_dist_spawn_reply(Eterm node,
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 65a6e1c6ad..feff6ca8c8 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -10916,8 +10916,6 @@ request_system_task(Process *c_p, Eterm requester, Eterm target,
if (signal) {
erts_aint32_t state;
- if (priority_req != am_inherit)
- goto badarg;
state = erts_atomic32_read_acqb(&rp->state);
if (state & fail_state & ERTS_PSFLG_EXITING)
goto noproc;
@@ -10926,11 +10924,12 @@ request_system_task(Process *c_p, Eterm requester, Eterm target,
* Send rpc request signal without reply,
* and reply from the system task...
*/
- Eterm res = erts_proc_sig_send_rpc_request(c_p,
- target,
- 0, /* no reply */
- sched_sig_sys_task,
- (void *) st);
+ Eterm res = erts_proc_sig_send_rpc_request_prio(c_p,
+ target,
+ 0, /* no reply */
+ sched_sig_sys_task,
+ (void *) st,
+ prio);
if (is_non_value(res))
goto noproc;
return ret; /* signal sent... */