Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
5111-erts-Improve-flushing-of-signals.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 5111-erts-Improve-flushing-of-signals.patch of Package erlang
From ac579bbec058b313af19de4a6d536c6ab446bfed Mon Sep 17 00:00:00 2001 From: Rickard Green <rickard@erlang.org> Date: Tue, 11 May 2021 19:17:32 +0200 Subject: [PATCH 1/2] [erts] Improve flushing of signals --- erts/emulator/beam/bif.c | 75 ++++---- erts/emulator/beam/bif.h | 10 +- erts/emulator/beam/erl_bif_info.c | 221 +++++++++++++++++------- erts/emulator/beam/erl_bif_port.c | 89 ++++++++-- erts/emulator/beam/erl_message.c | 6 + erts/emulator/beam/erl_proc_sig_queue.c | 181 +++++++++++++++++-- erts/emulator/beam/erl_proc_sig_queue.h | 70 +++++++- erts/emulator/beam/erl_process.c | 62 ++++--- erts/emulator/beam/erl_process.h | 9 +- erts/emulator/beam/erl_process_dump.c | 17 +- erts/emulator/test/bif_SUITE.erl | 60 ++++++- erts/emulator/test/nif_SUITE.erl | 5 +- erts/emulator/test/port_SUITE.erl | 1 + erts/emulator/test/process_SUITE.erl | 178 +++++++++++++++++-- erts/etc/unix/etp-commands.in | 2 +- 15 files changed, 793 insertions(+), 193 deletions(-) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index a30d6b6a9e..87241e6dce 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1524,7 +1524,7 @@ erts_internal_await_exit_trap(BIF_ALIST_0) * terminated in order to ensure that signal order * is preserved. Yield if necessary. */ - erts_aint32_t state; + erts_aint32_t state = erts_atomic32_read_nob(&BIF_P->state); int reds = ERTS_BIF_REDS_LEFT(BIF_P); (void) erts_proc_sig_handle_incoming(BIF_P, &state, &reds, reds, !0); @@ -5395,45 +5395,52 @@ static BIF_RETTYPE bif_return_trap(BIF_ALIST_2) } static BIF_RETTYPE -bif_handle_signals_return(BIF_ALIST_1) +bif_handle_signals_return(BIF_ALIST_2) { - int local_only = BIF_P->sig_qs.flags & FS_LOCAL_SIGS_ONLY; - int sres, sreds, reds_left; + int reds_left; erts_aint32_t state; - reds_left = ERTS_BIF_REDS_LEFT(BIF_P); - sreds = reds_left; - - if (!local_only) { - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MSGQ); - erts_proc_sig_fetch(BIF_P); - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MSGQ); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) { + flushed: + ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS); + BIF_P->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS); + erts_set_gc_state(BIF_P, !0); /* Allow GC again... */ + BIF_RET(BIF_ARG_2); } + + if (!(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS)) { + int flags = ((is_internal_pid(BIF_ARG_1) + || is_internal_port(BIF_ARG_1)) + ? ERTS_PROC_SIG_FLUSH_FLG_FROM_ID + : ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL); + erts_proc_sig_init_flush_signals(BIF_P, flags, BIF_ARG_1); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + } + + ASSERT(BIF_P->sig_qs.flags & FS_FLUSHING_SIGS); - state = erts_atomic32_read_nob(&BIF_P->state); - sres = erts_proc_sig_handle_incoming(BIF_P, &state, &sreds, - sreds, !0); - - BUMP_REDS(BIF_P, (int) sreds); - reds_left -= sreds; + reds_left = ERTS_BIF_REDS_LEFT(BIF_P); - if (state & ERTS_PSFLG_EXITING) { - BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - ERTS_BIF_EXITED(BIF_P); - } - if (!sres | (reds_left <= 0)) { - /* - * More signals to handle or out of reds; need - * to yield and continue. Prevent fetching of - * more signals by setting local-sigs-only flag. - */ - BIF_P->sig_qs.flags |= FS_LOCAL_SIGS_ONLY; - ERTS_BIF_YIELD1(&erts_bif_handle_signals_return_export, - BIF_P, BIF_ARG_1); - } + state = erts_atomic32_read_nob(&BIF_P->state); + do { + int sreds = reds_left; + (void) erts_proc_sig_handle_incoming(BIF_P, &state, &sreds, + sreds, !0); + BUMP_REDS(BIF_P, (int) sreds); + if (state & ERTS_PSFLG_EXITING) + ERTS_BIF_EXITED(BIF_P); + if (BIF_P->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + reds_left -= sreds; + } while (reds_left > 0); - BIF_P->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - BIF_RET(BIF_ARG_1); + /* + * More signals to handle, but out of reductions. Yield + * and come back here and continue... + */ + ERTS_BIF_YIELD2(&erts_bif_handle_signals_return_export, + BIF_P, BIF_ARG_1, BIF_ARG_2); } Export bif_return_trap_export; @@ -5475,7 +5482,7 @@ void erts_init_bif(void) &bif_return_trap); erts_init_trap_export(&erts_bif_handle_signals_return_export, - am_erlang, am_bif_handle_signals_return, 1, + am_erlang, am_bif_handle_signals_return, 2, &bif_handle_signals_return); erts_await_result = erts_export_put(am_erts_internal, diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h index 350f2ad430..a89ea5d4f2 100644 --- a/erts/emulator/beam/bif.h +++ b/erts/emulator/beam/bif.h @@ -522,12 +522,12 @@ do { \ extern Export erts_bif_handle_signals_return_export; -#define ERTS_BIF_HANDLE_SIGNALS_RETURN(P, VAL) \ - BIF_TRAP1(&erts_bif_handle_signals_return_export, (P), (VAL)) +#define ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(P, FROM, VAL) \ + BIF_TRAP2(&erts_bif_handle_signals_return_export, (P), (FROM), (VAL)) -#define ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(Ret, P, Val) \ - ERTS_BIF_PREP_TRAP1((Ret), &erts_bif_handle_signals_return_export, \ - (P), (Val)) +#define ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(Ret, P, From, Val) \ + ERTS_BIF_PREP_TRAP2((Ret), &erts_bif_handle_signals_return_export, \ + (P), (From), (Val)) #define ERTS_BIF_PREP_EXITED(RET, PROC) \ do { \ diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 2c27d460fd..430c4f1d80 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -1007,6 +1007,7 @@ process_info_aux(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int item_ix, + Sint *msgq_len_p, int flags, Uint *reserve_sizep, Uint *reds); @@ -1025,11 +1026,12 @@ erts_process_info(Process *c_p, Eterm res; Eterm part_res[ERTS_PI_ARGS]; int item_ix_ix, ix; + Sint msgq_len = -1; if (ERTS_PI_FLAG_SINGELTON & flags) { ASSERT(item_ix_len == 1); res = process_info_aux(c_p, hfact, rp, rp_locks, item_ix[0], - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); return res; } @@ -1047,7 +1049,7 @@ erts_process_info(Process *c_p, ix = pi_arg2ix(am_messages); ASSERT(part_res[ix] == THE_NON_VALUE); res = process_info_aux(c_p, hfact, rp, rp_locks, ix, - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); ASSERT(res != am_undefined); ASSERT(res != THE_NON_VALUE); part_res[ix] = res; @@ -1057,7 +1059,7 @@ erts_process_info(Process *c_p, ix = item_ix[item_ix_ix]; if (part_res[ix] == THE_NON_VALUE) { res = process_info_aux(c_p, hfact, rp, rp_locks, ix, - flags, &reserve_size, reds); + &msgq_len, flags, &reserve_size, reds); ASSERT(res != am_undefined); ASSERT(res != THE_NON_VALUE); part_res[ix] = res; @@ -1092,6 +1094,92 @@ erts_process_info(Process *c_p, static void pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix); +static ERTS_INLINE int +pi_maybe_flush_signals(Process *c_p, int pi_flags) +{ + int reds_left; + erts_aint32_t state; + + /* + * pi_maybe_flush_signals() flush signals in callers + * signal queue for two different reasons: + * + * 1. If we need 'message_queue_len', but not 'messages', we need + * to handle all signals in the middle queue in order for + * 'c_p->sig_qs.len' to reflect the amount of messages in the + * message queue. We could count traverse the queues, but it + * is better to handle all signals in the queue instead since + * this is work we anyway need to do at some point. + * + * 2. Ensures that all signals that the caller might have sent to + * itself are handled before we gather information. + * + * This is, however, not strictly necessary. process_info() is + * not documented to send itself a signal when gathering + * information about itself. That is, the operation is not + * bound by the signal order guarantee when gathering + * information about itself. If we do not handle outstanding + * signals before gathering the information, outstanding signals + * from the caller to itself will not be part of the result. + * This would not be wrong, but perhaps surprising for the user. + * We continue doing it this way for now, since this is how it + * has been done for a very long time. We should, however, + * consider changing this in a future release, since this signal + * handling is not for free, although quite cheap since these + * signals anyway must be handled at some point. + */ + + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) { + flushed: + + ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS + | ERTS_PI_FLAG_NEED_MSGQ_LEN)) + != ERTS_PI_FLAG_NEED_MSGQ_LEN) + || !c_p->sig_qs.cont); + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + + c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS); + erts_set_gc_state(c_p, !0); /* Allow GC again... */ + return 0; /* done, all signals handled... */ + } + + state = erts_atomic32_read_nob(&c_p->state); + + if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { + int flush_flags = 0; + if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS + | ERTS_PI_FLAG_NEED_MSGQ_LEN)) + == ERTS_PI_FLAG_NEED_MSGQ_LEN) + && c_p->sig_qs.cont) { + flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ; + } + if (state & ERTS_PSFLG_MAYBE_SELF_SIGS) + flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID; + if (!flush_flags) + return 0; /* done; no need to flush... */ + erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id); + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + } + + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + reds_left = ERTS_BIF_REDS_LEFT(c_p); + + do { + int sreds = reds_left; + (void) erts_proc_sig_handle_incoming(c_p, &state, &sreds, + sreds, !0); + BUMP_REDS(c_p, (int) sreds); + if (state & ERTS_PSFLG_EXITING) + return -1; /* process exiting... */ + if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) + goto flushed; + reds_left -= sreds; + } while (reds_left > 0); + + return 1; /* yield and continue here later... */ +} + static BIF_RETTYPE process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) { @@ -1110,41 +1198,6 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) ERTS_CT_ASSERT(ERTS_PI_DEF_ARR_SZ > 0); - if (c_p->common.id == pid) { - int local_only = c_p->sig_qs.flags & FS_LOCAL_SIGS_ONLY; - int sres, sreds, reds_left; - - reds_left = ERTS_BIF_REDS_LEFT(c_p); - sreds = reds_left; - - if (!local_only) { - erts_proc_sig_queue_lock(c_p); - erts_proc_sig_fetch(c_p); - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - } - - sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0); - - BUMP_REDS(c_p, (int) sreds); - reds_left -= sreds; - - if (state & ERTS_PSFLG_EXITING) { - c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - goto exited; - } - if (!sres | (reds_left <= 0)) { - /* - * More signals to handle or out of reds; need - * to yield and continue. Prevent fetching of - * more signals by setting local-sigs-only flag. - */ - c_p->sig_qs.flags |= FS_LOCAL_SIGS_ONLY; - goto yield; - } - - c_p->sig_qs.flags &= ~FS_LOCAL_SIGS_ONLY; - } - if (is_atom(opt)) { int ix = pi_arg2ix(opt); item_ix[0] = ix; @@ -1190,7 +1243,16 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) goto badarg; } - if (is_not_internal_pid(pid)) { + if (c_p->common.id == pid) { + int res = pi_maybe_flush_signals(c_p, flags); + if (res != 0) { + if (res > 0) + goto yield; + else + goto exited; + } + } + else if (is_not_internal_pid(pid)) { if (is_external_pid(pid) && external_pid_dist_entry(pid) == erts_this_dist_entry) goto undefined; @@ -1226,6 +1288,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) } if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) { ASSERT(locks & ERTS_PROC_LOCK_MAIN); + if (rp->sig_qs.flags & FS_FLUSHING_SIGS) { + erts_proc_unlock(rp, locks); + goto send_signal; + } erts_proc_sig_queue_lock(rp); erts_proc_sig_fetch(rp); if (rp->sig_qs.cont) { @@ -1264,7 +1330,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) if (c_p == rp || !ERTS_PROC_HAS_INCOMING_SIGNALS(c_p)) ERTS_BIF_PREP_RET(ret, res); else - ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(ret, c_p, res); + ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(ret, c_p, + pid, res); done: @@ -1355,6 +1422,7 @@ process_info_aux(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int item_ix, + Sint *msgq_len_p, int flags, Uint *reserve_sizep, Uint *reds) @@ -1468,8 +1536,10 @@ process_info_aux(Process *c_p, case ERTS_PI_IX_MESSAGES: { ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) + if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) { + *msgq_len_p = 0; res = NIL; + } else { int info_on_self = !(flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER); ErtsMessageInfo *mip; @@ -1487,8 +1557,8 @@ process_info_aux(Process *c_p, heap_need = erts_proc_sig_prep_msgq_for_inspection(c_p, rp, rp_locks, info_on_self, - mip); - len = rp->sig_qs.len; + mip, msgq_len_p); + len = *msgq_len_p; heap_need += len*2; /* Cons cells */ @@ -1517,9 +1587,13 @@ process_info_aux(Process *c_p, } case ERTS_PI_IX_MESSAGE_QUEUE_LEN: { - Sint len = rp->sig_qs.len; + Sint len = *msgq_len_p; + if (len < 0) { + ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) + || !rp->sig_qs.cont); + len = rp->sig_qs.len; + } ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) || !rp->sig_qs.cont); ASSERT(len >= 0); if (len <= MAX_SMALL) res = make_small(len); @@ -3690,42 +3764,54 @@ BIF_RETTYPE erts_internal_is_process_alive_2(BIF_ALIST_2) BIF_ERROR(BIF_P, BADARG); if (!erts_proc_sig_send_is_alive_request(BIF_P, BIF_ARG_1, BIF_ARG_2)) { if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, am_ok); + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, am_ok); } BIF_RET(am_ok); } BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) { + if (is_internal_pid(BIF_ARG_1)) { - erts_aint32_t state; + BIF_RETTYPE result; Process *rp; if (BIF_ARG_1 == BIF_P->common.id) BIF_RET(am_true); rp = erts_proc_lookup_raw(BIF_ARG_1); - if (!rp) - BIF_RET(am_false); + if (!rp) { + result = am_false; + } + else { + erts_aint32_t state = erts_atomic32_read_acqb(&rp->state); + if (state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_SIG_Q + | ERTS_PSFLG_SIG_IN_Q)) { + /* + * If in exiting state, trap out and send 'is alive' + * request and wait for it to complete termination. + * + * If process has signals enqueued, we need to + * send it an 'is alive' request via its signal + * queue in order to ensure that signal order is + * preserved (we may earlier have sent it an + * exit signal that has not been processed yet). + */ + BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1); + } - state = erts_atomic32_read_acqb(&rp->state); - if (state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_SIG_Q - | ERTS_PSFLG_SIG_IN_Q)) { + result = am_true; + } + + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { /* - * If in exiting state, trap out and send 'is alive' - * request and wait for it to complete termination. - * - * If process has signals enqueued, we need to - * send it an 'is alive' request via its signal - * queue in order to ensure that signal order is - * preserved (we may earlier have sent it an - * exit signal that has not been processed yet). + * Ensure that signal order of signals from inspected + * process to us is preserved... */ - BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1); + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, BIF_ARG_1, result); } - - BIF_RET(am_true); + BIF_RET(result); } if (is_external_pid(BIF_ARG_1)) { @@ -3734,6 +3820,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) } BIF_ERROR(BIF_P, BADARG); + + } static Eterm @@ -4218,10 +4306,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) ERTS_ASSERT_IS_NOT_EXITING(BIF_P); BIF_RET(am_undefined); } - erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + state = erts_atomic32_read_nob(&BIF_P->state); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, @@ -4284,10 +4372,11 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) ERTS_ASSERT_IS_NOT_EXITING(BIF_P); BIF_RET(am_undefined); } - + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + state = erts_atomic32_read_nob(&BIF_P->state); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c index fc415aa4e5..737a6be15f 100644 --- a/erts/emulator/beam/erl_bif_port.c +++ b/erts/emulator/beam/erl_bif_port.c @@ -238,8 +238,17 @@ BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3) } else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_PREP_HANDLE_SIGNALS_RETURN(res, BIF_P, res); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_PREP_HANDLE_SIGNALS_FROM_RETURN(res, BIF_P, + from, res); + } } return res; @@ -287,8 +296,16 @@ BIF_RETTYPE erts_internal_port_call_3(BIF_ALIST_3) ERTS_BIF_EXITED(BIF_P); else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } } BIF_RET(retval); @@ -335,8 +352,16 @@ BIF_RETTYPE erts_internal_port_control_3(BIF_ALIST_3) ERTS_BIF_EXITED(BIF_P); else { /* Ensure signal order is preserved... */ - if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } } BIF_RET(retval); @@ -382,8 +407,16 @@ BIF_RETTYPE erts_internal_port_close_1(BIF_ALIST_1) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -426,8 +459,16 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -435,7 +476,7 @@ BIF_RETTYPE erts_internal_port_connect_2(BIF_ALIST_2) BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) { Eterm retval; - Port* prt; + Port* prt = NULL; if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) { prt = sig_lookup_port(BIF_P, BIF_ARG_1); @@ -474,8 +515,16 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } @@ -484,7 +533,7 @@ BIF_RETTYPE erts_internal_port_info_1(BIF_ALIST_1) BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2) { Eterm retval; - Port* prt; + Port* prt = NULL; if (is_internal_port(BIF_ARG_1) || is_atom(BIF_ARG_1)) { prt = sig_lookup_port(BIF_P, BIF_ARG_1); @@ -523,8 +572,16 @@ BIF_RETTYPE erts_internal_port_info_2(BIF_ALIST_2) } /* Ensure signal order is preserved... */ - if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) - ERTS_BIF_HANDLE_SIGNALS_RETURN(BIF_P, retval); + if (ERTS_PROC_HAS_INCOMING_SIGNALS(BIF_P)) { + Eterm from; + if (is_internal_port(BIF_ARG_1)) + from = BIF_ARG_1; + else if (prt) + from = prt->common.id; + else + from = NIL; + ERTS_BIF_HANDLE_SIGNALS_FROM_RETURN(BIF_P, from, retval); + } BIF_RET(retval); } diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 151d0b41ad..0026ecce99 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -539,6 +539,9 @@ erts_queue_proc_message(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* mp, Eterm msg) { + if (sender == receiver) + (void) erts_atomic32_read_bor_nob(&sender->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = sender->common.id; queue_messages(sender->common.id, receiver, receiver_locks, @@ -552,6 +555,9 @@ erts_queue_proc_messages(Process* sender, Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, Uint len) { + if (sender == receiver) + (void) erts_atomic32_read_bor_nob(&sender->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); queue_messages(sender->common.id, receiver, receiver_locks, prepend_pending_sig_maybe(sender, receiver, first), last, len); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 7cb5f9e491..bfddf9e48f 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -729,7 +729,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, /* Tracing requires sender for local procs and ports. The assertions below * will not catch errors after time-of-death, but ought to find most * problems. */ - ASSERT(sender != NULL || + ASSERT(sender != NULL || op == ERTS_SIG_Q_OP_FLUSH || (is_normal_sched && esdp->pending_signal.sig == sig) || (!(is_internal_pid(from) && erts_proc_lookup(from) != NULL) && @@ -824,6 +824,11 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, sigp = &first; first_last_done: + + if ((void *) sender == (void *) rp) + (void) erts_atomic32_read_bor_nob(&((Process *) sender)->state, + ERTS_PSFLG_MAYBE_SELF_SIGS); + sig->common.specific.next = NULL; /* may add signals before sig */ @@ -2751,6 +2756,83 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to) } } +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) +{ + int force_flush_buffers = 0, enqueue_mq, fetch_sigs; + ErtsSignal *sig; + + ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + ASSERT(!(c_p->sig_qs.flags & (FS_FLUSHING_SIGS|FS_FLUSHED_SIGS))); + ASSERT(flags); + ASSERT((flags & ~ERTS_PROC_SIG_FLUSH_FLGS) == 0); + ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + || is_internal_pid(id) || is_internal_port(id)); + + sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon)); + sig->common.next = NULL; + sig->common.specific.attachment = NULL; + sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + switch (flags) { + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL: + id = c_p->common.id; + force_flush_buffers = !0; + /* Fall through... */ + case ERTS_PROC_SIG_FLUSH_FLG_FROM_ID: + if (!proc_queue_signal(NULL, id, c_p->common.id, sig, + force_flush_buffers, ERTS_SIG_Q_OP_FLUSH)) + ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves"); + enqueue_mq = 0; + fetch_sigs = !0; + break; + case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ: + enqueue_mq = !0; + fetch_sigs = 0; + break; + default: + enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ); + fetch_sigs = !0; + break; + } + + erts_set_gc_state(c_p, 0); + + if (fetch_sigs) { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } + + c_p->sig_qs.flags |= FS_FLUSHING_SIGS; + + if (enqueue_mq) { + if (!c_p->sig_qs.cont) { + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + } + else { + if (!c_p->sig_qs.nmsigs.last) { + ASSERT(!c_p->sig_qs.nmsigs.next); + c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last; + } + else { + ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last; + ASSERT(c_p->sig_qs.nmsigs.next); + ASSERT(lsig && !lsig->common.specific.next); + lsig->common.specific.next = c_p->sig_qs.cont_last; + } + + c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last; + *c_p->sig_qs.cont_last = (ErtsMessage *) sig; + c_p->sig_qs.cont_last = &sig->common.next; + } + } + +} + static int handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp) { @@ -5172,7 +5172,7 @@ erts_proc_sig_handle_incoming(Process *c int *redsp, int max_reds, int local_only) { Eterm tag; - erts_aint32_t state; + erts_aint32_t state = *statep; int yield, cnt, limit, abs_lim, msg_tracing, save_in_msgq; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; @@ -5182,8 +5182,7 @@ erts_proc_sig_handle_incoming(Process *c ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); - state = erts_atomic32_read_nob(&c_p->state); - if (!local_only) { + if (!local_only && !(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { if (ERTS_PSFLG_SIG_IN_Q & state) { erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); @@ -5694,6 +5775,20 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + case ERTS_SIG_Q_OP_FLUSH: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + remove_nm_sig(c_p, sig, next_nm_sig); + erts_free(ERTS_ALC_T_SIG_DATA, sig); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + /* + * The caller has been exclusively handling signals until this + * point. Break out and let the process continue with other + * things as well... + */ + goto stop; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -5810,7 +5905,7 @@ stop: { *next_nm_sig = &c_p->sig_qs.cont; if (c_p->sig_qs.nmsigs.last == tracing.messages.next) c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); } else { ASSERT(!c_p->sig_qs.nmsigs.next); @@ -5818,7 +5913,6 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; } if (tracing.messages.next != &c_p->sig_qs.cont) { @@ -5864,7 +5958,7 @@ stop: { ASSERT(c_p->sig_qs.cont); - *statep = erts_atomic32_read_nob(&c_p->state); + state = erts_atomic32_read_nob(&c_p->state); res = 0; } @@ -5897,10 +5991,21 @@ stop: { state = erts_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_SIG_Q); state &= ~ERTS_PSFLG_SIG_Q; - *statep = state; res = !0; } + if (!!(state & ERTS_PSFLG_MAYBE_SELF_SIGS) + & !(state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))) { + /* + * We know we do not have any outstanding signals + * from ourselves... + */ + (void) erts_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_MAYBE_SELF_SIGS); + state &= ~ERTS_PSFLG_MAYBE_SELF_SIGS; + } + *statep = state; + /* Ensure that 'save' doesn't point to a receive marker... */ if (*c_p->sig_qs.save && ERTS_SIG_IS_RECV_MARKER(*c_p->sig_qs.save)) { @@ -6183,6 +6288,12 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, } break; + case ERTS_SIG_Q_OP_FLUSH: + ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); + c_p->sig_qs.flags |= FS_FLUSHED_SIGS; + erts_free(ERTS_ALC_T_SIG_DATA, sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: destroy_trace_info((ErtsSigTraceInfo *) sig); break; @@ -6280,6 +6391,7 @@ clear_seq_trace_token(ErtsMessage *sig) case ERTS_SIG_Q_OP_RPC: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_ADJ_MSGQ: + case ERTS_SIG_Q_OP_FLUSH: break; default: @@ -6292,8 +6404,33 @@ clear_seq_trace_token(ErtsMessage *sig) void erts_proc_sig_clear_seq_trace_tokens(Process *c_p) { - erts_proc_sig_fetch(c_p); - ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig)); + int ix; + ErtsSignalInQueueBufferArray *bap; + int unget_info; + ErtsMessage *qs[] = {c_p->sig_qs.first, + c_p->sig_qs.cont, + c_p->sig_inq.first}; + + ERTS_LC_ASSERT(erts_thr_progress_is_blocking()); + + for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) { + ErtsMessage *sigp; + for (sigp = qs[ix]; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + + bap = erts_proc_sig_queue_get_buffers(c_p, &unget_info); + if (bap) { + for (ix = 0; ix < ERTS_PROC_SIG_INQ_BUFFERED_NR_OF_BUFFERS; ix++) { + ErtsSignalInQueueBuffer* bp = &bap->slots[ix]; + if (bp->b.alive) { + ErtsMessage *sigp; + for (sigp = bp->b.queue.first; sigp; sigp = sigp->next) + clear_seq_trace_token(sigp); + } + } + erts_proc_sig_queue_unget_buffers(bap, unget_info); + } } Uint @@ -6419,6 +6556,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; } + case ERTS_SIG_Q_OP_FLUSH: + size = sizeof(ErtsSignalCommon); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: size = sizeof(ErtsSigTraceInfo); break; @@ -6537,6 +6678,7 @@ erts_proc_sig_receive_helper(Process *c_p, if (max_reds < reds) max_reds = reds; #endif + state = erts_atomic32_read_nob(&c_p->state); (void) erts_proc_sig_handle_incoming(c_p, &state, &reds, max_reds, !0); consumed_reds += reds; @@ -7246,7 +7388,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip) + ErtsMessageInfo *mip, + Sint *msgq_len_p) { Uint tot_heap_size; ErtsMessage *mp, **mpp; @@ -7320,7 +7463,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, mp = mp->next; } - ASSERT(c_p->sig_qs.len == i); + + ASSERT(info_on_self || c_p->sig_qs.len == i); + ASSERT(!info_on_self || c_p->sig_qs.len >= i); + + *msgq_len_p = i; return tot_heap_size; } @@ -7409,7 +7556,6 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1) BIF_RET(am_normal); /* will handle signals itself... */ } else { - erts_aint32_t state; int done; Eterm res = am_false; int reds = 0; @@ -7708,6 +7856,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_PROCESS_INFO: case ERTS_SIG_Q_OP_RECV_MARK: case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK: + case ERTS_SIG_Q_OP_FLUSH: break; default: diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index b3aefff0b0..deb1b802e1 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -110,7 +110,7 @@ typedef struct { * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 18 +#define ERTS_SIG_Q_OP_MAX 19 #define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */ #define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/ @@ -130,7 +130,8 @@ typedef struct { #define ERTS_SIG_Q_OP_ALIAS_MSG 15 #define ERTS_SIG_Q_OP_RECV_MARK 16 #define ERTS_SIG_Q_OP_UNLINK_ACK 17 -#define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_ADJ_MSGQ 18 +#define ERTS_SIG_Q_OP_FLUSH ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10) @@ -1133,6 +1134,9 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to); * * @param[out] statep Pointer to process state after * signal handling. May not be NULL. + * The state should recently have + * been updated before calling + * this function. * * @param[in,out] redsp Pointer to an integer containing * reductions. On input, the amount @@ -1253,6 +1257,58 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls, int neg_o_reds, ErtsMessage **msgpp, int *get_outp); +/* + * CLEAN_SIGQ - Flush until middle queue is empty, i.e. + * the content of inner+middle queue equals + * the message queue. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0) +/* + * FROM_ALL - Flush signals from all local senders (processes + * and ports). + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL (1 << 1) +/* + * FROM_ID - Flush signals from process or port identified + * by 'id'. + */ +#define ERTS_PROC_SIG_FLUSH_FLG_FROM_ID (1 << 2) + +/* + * All erts_proc_sig_init_flush_signals() flags. + */ +#define ERTS_PROC_SIG_FLUSH_FLGS \ + (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \ + | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) + +/** + * + * @brief Initialize flush of signals from another process or port + * + * Inserts a flush signal in the outer signal queue of + * current process and sets the FS_FLUSHING_SIGS flag in + * 'c_p->sig_qs.flags'. When the flush signal has been + * handled the FS_FLUSHED_SIGS flag is set as well. + * + * While the flushing is ongoing the process *should* only + * handle incoming signals and not execute Erlang code. When + * the functionality that initiated the flush detects that + * the flush is done by the FS_FLUSHED_SIGS flag being set, + * it should clear both the FS_FLUSHED_SIGS flag and the + * FS_FLUSHING_SIGS flag. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * flags Flags indicating how to flush. + * (see above). + * from Identifier of sender to flush + * signals from in case the + * FROM_ID flag is set. + */ +void +erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm from); + /** * * @brief Fetch signals from the outer queue @@ -1370,12 +1426,16 @@ typedef struct { * * @param[in] mip Pointer to array of * ErtsMessageInfo structures. + * + * @param[out] msgq_lenp Pointer to integer containing + * amount of messages. */ Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ErtsProcLocks rp_locks, int info_on_self, - ErtsMessageInfo *mip); + ErtsMessageInfo *mip, + Sint *msgq_lenp); /** * @@ -1744,7 +1744,9 @@ erts_proc_sig_fetch(Process *proc) == (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_MSGQ))); - ASSERT(!(proc->sig_qs.flags & FS_HANDLING_SIGS)); + ASSERT(!(proc->sig_qs.flags & FS_FLUSHING_SIGS) + || ERTS_PROC_IS_EXITING(proc) + || ERTS_IS_CRASH_DUMPING); ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index ff81b453a7..393de2b7b1 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10063,27 +10063,30 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) if (state & ERTS_PSFLG_RUNNING_SYS) { if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { - int local_only = (!!(p->sig_qs.flags & FS_LOCAL_SIGS_ONLY) - & !(state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLGS_DIRTY_WORK))); - if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) { - int sig_reds; - /* - * If we have dirty work scheduled we allow - * usage of all reductions since we need to - * handle all signals before doing dirty - * work... - */ - if (state & ERTS_PSFLGS_DIRTY_WORK) - sig_reds = reds; - else - sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; - (void) erts_proc_sig_handle_incoming(p, - &state, - &sig_reds, - sig_reds, - local_only); - reds -= sig_reds; - } + int sig_reds; + /* + * If we have dirty work scheduled we allow + * usage of all reductions since we need to + * handle all signals before doing dirty + * work... + * + * If a BIF is flushing signals, we also allow + * usage of all reductions since the BIF cannot + * continue exectution until the flush + * completes... + */ + if (state & ERTS_PSFLGS_DIRTY_WORK) + sig_reds = reds; + else if (p->sig_qs.flags & FS_FLUSHING_SIGS) + sig_reds = reds; + else + sig_reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED; + (void) erts_proc_sig_handle_incoming(p, + &state, + &sig_reds, + sig_reds, + 0); + reds -= sig_reds; } if ((state & (ERTS_PSFLG_SYS_TASKS | ERTS_PSFLG_EXITING)) == ERTS_PSFLG_SYS_TASKS) { @@ -10093,8 +10096,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) * hand written beam assembly in * prim_eval:'receive'. If GC is delayed we are * not allowed to execute system tasks. + * + * We also don't allow execution of system tasks + * if a BIF is flushing signals, since there are + * system tasks that might need to fetch from the + * outer signal queue... */ - if (!(p->flags & F_DELAY_GC)) { + if (!(p->flags & F_DELAY_GC) + && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { int cost = execute_sys_tasks(p, &state, reds); calls += cost; reds -= cost; @@ -10622,15 +10631,19 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) break; case ERTS_PSTT_PRIO_SIG: { erts_aint32_t fail_state, state; - int sig_res, sig_reds = reds; + int sig_res, sig_reds; st_res = am_false; + ASSERT(!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)); + if (st->arg[0] == am_false) { erts_proc_sig_queue_lock(c_p); erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + st->arg[0] = am_true; } + state = erts_atomic32_read_nob(&c_p->state); sig_reds = reds; sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds, reds, !0); @@ -10644,8 +10657,6 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) if (sig_res) break; - st->arg[0] = am_true; - fail_state = ERTS_PSFLG_EXITING; if (schedule_process_sys_task(c_p, st_prio, st, &fail_state)) { @@ -10656,6 +10667,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) state = erts_atomic32_read_nob(&c_p->state); exit_permanent_prio_elevation(c_p, state, st_prio); } + break; } case ERTS_PSTT_TEST: diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 81958e373a..0459aeaba5 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1246,8 +1246,9 @@ void erts_check_for_holes(Process* p); process table. Always ACTIVE while EXITING. Never SUSPENDED unless also FREE. */ #define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5) -/* UNUSED */ -#define ERTS_PSFLG_UNUSED ERTS_PSFLG_BIT(6) +/* MAYBE_SELF_SIGS - We might have outstanding signals + from ourselves to ourselvs. */ +#define ERTS_PSFLG_MAYBE_SELF_SIGS ERTS_PSFLG_BIT(6) /* ACTIVE - Process "wants" to execute */ #define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7) /* IN_RUNQ - Real process (not proxy) struct used in a @@ -1572,10 +1573,12 @@ extern int erts_system_profile_ts_type; #define FS_OFF_HEAP_MSGQ (1 << 0) /* Off heap msg queue */ #define FS_ON_HEAP_MSGQ (1 << 1) /* On heap msg queue */ #define FS_OFF_HEAP_MSGQ_CHNG (1 << 2) /* Off heap msg queue changing */ -#define FS_LOCAL_SIGS_ONLY (1 << 3) /* Handle privq sigs only */ +#define FS_UNUSED (1 << 3) /* Unused */ #define FS_HANDLING_SIGS (1 << 4) /* Process is handling signals */ #define FS_WAIT_HANDLE_SIGS (1 << 5) /* Process is waiting to handle signals */ #define FS_DELAYED_PSIGQS_LEN (1 << 6) /* Delayed update of sig_qs.len */ +#define FS_FLUSHING_SIGS (1 << 7) /* Currently flushing signals */ +#define FS_FLUSHED_SIGS (1 << 8) /* Flushing of signals completed */ /* * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index f95c7ad1d7..9fa5969b5e 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -159,9 +159,12 @@ Uint erts_process_memory(Process *p, int include_sigs_in_transit) * Size of message queue plus size of all signals * in transit to the process! */ - erts_proc_sig_queue_lock(p); - erts_proc_sig_fetch(p); - erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) + || ERTS_IS_CRASH_DUMPING) { + erts_proc_sig_queue_lock(p); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } ERTS_FOREACH_SIG_PRIVQS( p, mp, @@ -228,7 +231,9 @@ dump_process_info(fmtfn_t to, void *to_arg, Process *p) if (ERTS_TRACE_FLAGS(p) & F_SENSITIVE) return; - erts_proc_sig_fetch(p); + if (!(p->sig_qs.flags & FS_FLUSHING_SIGS) || ERTS_IS_CRASH_DUMPING) { + erts_proc_sig_fetch(p); + } if (p->sig_qs.first || p->sig_qs.cont) { erts_print(to, to_arg, "=proc_messages:%T\n", p->common.id); @@ -1123,8 +1128,8 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg) erts_print(to, to_arg, "FREE"); break; case ERTS_PSFLG_EXITING: erts_print(to, to_arg, "EXITING"); break; - case ERTS_PSFLG_UNUSED: - erts_print(to, to_arg, "UNUSED"); break; + case ERTS_PSFLG_MAYBE_SELF_SIGS: + erts_print(to, to_arg, "MAYBE_SELF_SIGS"); break; case ERTS_PSFLG_ACTIVE: erts_print(to, to_arg, "ACTIVE"); break; case ERTS_PSFLG_IN_RUNQ: diff --git a/erts/emulator/test/bif_SUITE.erl b/erts/emulator/test/bif_SUITE.erl index c5be79528b..ee6c494145 100644 --- a/erts/emulator/test/bif_SUITE.erl +++ b/erts/emulator/test/bif_SUITE.erl @@ -37,6 +37,7 @@ error_stacktrace_during_call_trace/1, group_leader_prio/1, group_leader_prio_dirty/1, is_process_alive/1, + is_process_alive_signal_from/1, process_info_blast/1, os_env_case_sensitivity/1, verify_middle_queue_save/1, @@ -57,7 +58,8 @@ all() -> erl_crash_dump_bytes, min_max, erlang_halt, is_builtin, error_stacktrace, error_stacktrace_during_call_trace, group_leader_prio, group_leader_prio_dirty, - is_process_alive, process_info_blast, os_env_case_sensitivity, + is_process_alive, is_process_alive_signal_from, + process_info_blast, os_env_case_sensitivity, verify_middle_queue_save, test_length,fixed_apply_badarg, external_fun_apply3]. @@ -1205,6 +1207,51 @@ is_process_alive(Config) when is_list(Config) -> Ps), ok. +is_process_alive_signal_from(Config) when is_list(Config) -> + process_flag(priority, high), + process_flag(scheduler, 1), + Schdlr = case erlang:system_info(schedulers_online) of + 1 -> 1; + _ -> 2 + end, + X = is_process_alive_signal_from_test(100000, 0, Schdlr), + erlang:display({exits_detected, X}), + {comment, integer_to_list(X) ++ " exited processes detected"}. + +is_process_alive_signal_from_test(0, X, _Schdlr) -> + X; +is_process_alive_signal_from_test(N, X, Schdlr) -> + Tester = self(), + {Testee, TMon} = spawn_opt(fun () -> + Mon = erlang:monitor(process, Tester), + Tester ! {self(), ready}, + busy_wait_go(), + _ = erlang:demonitor(Mon), + exit(normal) + end, + [link, + monitor, + {priority, high}, + {scheduler, Schdlr}]), + receive {Testee, ready} -> ok end, + {monitored_by, MBList1} = process_info(self(), monitored_by), + true = lists:member(Testee, MBList1), + erlang:yield(), + Testee ! {go, ok}, + erlang:yield(), + NewX = case erlang:is_process_alive(Testee) of + true -> + X; + false -> + %% Demonitor signal should have reached us before the + %% is-process-alive reply... + {monitored_by, MBList2} = process_info(self(), monitored_by), + false = lists:member(Testee, MBList2), + X+1 + end, + receive {'DOWN', TMon, process, Testee, normal} -> ok end, + is_process_alive_signal_from_test(N-1, NewX, Schdlr). + process_info_blast(Config) when is_list(Config) -> Tester = self(), NoAttackers = 1000, @@ -1396,7 +1443,16 @@ external_fun_apply3(_Config) -> ok. %% helpers - + +busy_wait_go() -> + receive + {go, Info} -> + Info + after + 0 -> + busy_wait_go() + end. + id(I) -> I. %% Get code path, including the path for the erts application. diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl index 4aa9cf6b9f..2f28b4a0e9 100644 --- a/erts/emulator/test/nif_SUITE.erl +++ b/erts/emulator/test/nif_SUITE.erl @@ -1152,6 +1152,7 @@ monitor_process_c(Config) -> Pid = spawn_link(fun() -> R_ptr = alloc_monitor_resource_nif(), {0,Mon} = monitor_process_nif(R_ptr, self(), true, Papa), + receive after 1000 -> ok end, [R_ptr] = monitored_by(self()), put(store, make_resource(R_ptr)), ok = release_resource(R_ptr), @@ -1159,8 +1160,8 @@ monitor_process_c(Config) -> Papa ! {self(), done, R_ptr, Mon}, exit end), - [{Pid, done, R_ptr, Mon1}, - {monitor_resource_down, R_ptr, Pid, Mon2}] = flush(2), + receive {Pid, done, R_ptr, Mon1} -> ok end, + [{monitor_resource_down, R_ptr, Pid, Mon2}] = flush(1), compare_monitors_nif(Mon1, Mon2), {R_ptr, _, 1} = last_resource_dtor_call(), ok. diff --git a/erts/emulator/test/port_SUITE.erl b/erts/emulator/test/port_SUITE.erl index 4a3ecef397..1fa6922648 100644 --- a/erts/emulator/test/port_SUITE.erl +++ b/erts/emulator/test/port_SUITE.erl @@ -1915,6 +1915,7 @@ otp_5112(Config) when is_list(Config) -> true = lists:member(Port, Links1), Port ! {self(), {command, ""}}, wait_until(fun () -> lists:member(Port, erlang:ports()) == false end), + receive after 1000 -> ok end, %% Give signal some time to propagate... {links, Links2} = process_info(self(),links), io:format("Links2: ~p~n",[Links2]), false = lists:member(Port, Links2), %% This used to fail diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index 0e549d424c..23a208adb2 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -49,6 +49,10 @@ process_info_smoke_all/1, process_info_status_handled_signal/1, process_info_reductions/1, + process_info_self_signal/1, + process_info_self_msgq_len/1, + process_info_self_msgq_len_messages/1, + process_info_self_msgq_len_more/1, bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1, otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1, garbage_collect/1, otp_6237/1, process_info_messages/1, @@ -107,21 +111,8 @@ suite() -> all() -> [spawn_with_binaries, t_exit_1, {group, t_exit_2}, trap_exit_badarg, trap_exit_badarg_in_bif, - t_process_info, process_info_other, process_info_other_msg, - process_info_other_dist_msg, process_info_other_status, - process_info_2_list, - process_info_lock_reschedule, - process_info_lock_reschedule2, - process_info_lock_reschedule3, - process_info_other_message_queue_len_signal_race, - process_info_garbage_collection, - process_info_parent, - process_info_smoke_all, - process_info_status_handled_signal, - process_info_reductions, bump_reductions, low_prio, yield, yield2, otp_4725, - dist_unlink_ack_exit_leak, - bad_register, garbage_collect, process_info_messages, + dist_unlink_ack_exit_leak, bad_register, garbage_collect, process_flag_badarg, process_flag_fullsweep_after, process_flag_heap_size, spawn_opt_heap_size, spawn_opt_max_heap_size, @@ -129,6 +120,7 @@ all() -> spawn_request_reply_option, dist_spawn_arg_list_mixup, otp_6237, + {group, process_info_bif}, {group, processes_bif}, {group, otp_7738}, garb_other_running, {group, system_task}, @@ -159,6 +151,24 @@ groups() -> processes_small_tab, processes_this_tab, processes_last_call_trap, processes_apply_trap, processes_gc_trap, processes_term_proc_list]}, + {process_info_bif, [], + [t_process_info, process_info_messages, + process_info_other, process_info_other_msg, + process_info_other_message_queue_len_signal_race, + process_info_other_dist_msg, process_info_other_status, + process_info_2_list, + process_info_lock_reschedule, + process_info_lock_reschedule2, + process_info_lock_reschedule3, + process_info_garbage_collection, + process_info_parent, + process_info_smoke_all, + process_info_status_handled_signal, + process_info_reductions, + process_info_self_signal, + process_info_self_msgq_len, + process_info_self_msgq_len_messages, + process_info_self_msgq_len_more]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, otp_7738_resume]}, @@ -1351,6 +1361,146 @@ pi_reductions_main_unlocker_loop(Other) -> erlang:yield(), pi_reductions_main_unlocker_loop(Other). +process_info_self_signal(Config) when is_list(Config) -> + %% Test that signals that we have sent to ourselves are + %% visible in process_info() result. This is not strictly + %% a necessary property, but implemented so now. See + %% process_info.c:process_info_bif() for more info. + Self = self(), + Ref = make_ref(), + pi_sig_spam_test(fun () -> + process_info_self_signal_spammer(Self) + end, + fun () -> + self() ! Ref, + process_info(self(), messages) + end, + fun (Res) -> + {messages, [Ref]} = Res + end). + +process_info_self_signal_spammer(To) -> + erlang:demonitor(erlang:monitor(process, To)), + process_info_self_signal_spammer(To). + +process_info_self_msgq_len(Config) when is_list(Config) -> + %% Spam ourselves with signals forcing us to flush own + %% signal queue.. + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun () -> + process_info(self(), message_queue_len) + end, + fun (Res) -> + {message_queue_len, Len} = Res, + true = Len > 0, + ok + end). + + +process_info_self_msgq_len_messages(Config) when is_list(Config) -> + %% Spam ourselves with signals normally forcing us to flush own + %% signal queue, but since we also want messages wont be flushed... + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self, 100000) + end, + fun () -> + process_info(self(), + [message_queue_len, + messages]) + end, + fun (Res) -> + [{message_queue_len, Len}, + {messages, Msgs}] = Res, + Len = length(Msgs), + ok + end). + +process_info_self_msgq_len_more(Config) when is_list(Config) -> + self() ! hej, + BodyRes = process_info_self_msgq_len_more_caller_body(), + ok = process_info_self_msgq_len_more_caller_body_result(BodyRes), + TailRes = process_info_self_msgq_len_more_caller_tail(), + ok = process_info_self_msgq_len_more_caller_tail_result(TailRes), + receive hej -> ok end, + %% Check that current_function, current_location, and + %% current_stacktrace give sane results flushing or not... + Self = self(), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun process_info_self_msgq_len_more_caller_body/0, + fun process_info_self_msgq_len_more_caller_body_result/1), + pi_sig_spam_test(fun () -> + process_info_self_msgq_len_spammer(Self) + end, + fun process_info_self_msgq_len_more_caller_tail/0, + fun process_info_self_msgq_len_more_caller_tail_result/1). + +process_info_self_msgq_len_more_caller_body() -> + Res = process_info(self(), + [message_queue_len, + current_function, + current_location, + current_stacktrace]), + id(Res). + +process_info_self_msgq_len_more_caller_body_result(Res) -> + [{message_queue_len, Len}, + {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_body,0}}, + {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_body,0,_}}, + {current_stacktrace, + [{process_SUITE,process_info_self_msgq_len_more_caller_body,0,_} | _]}] = Res, + true = Len > 0, + ok. + +process_info_self_msgq_len_more_caller_tail() -> + process_info(self(), + [message_queue_len, + current_function, + current_location, + current_stacktrace]). + +process_info_self_msgq_len_more_caller_tail_result(Res) -> + [{message_queue_len, Len}, + {current_function, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0}}, + {current_location, {process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_}}, + {current_stacktrace, + [{process_SUITE,process_info_self_msgq_len_more_caller_tail,0,_} | _]}] = Res, + true = Len > 0, + ok. + + +process_info_self_msgq_len_spammer(To) -> + process_info_self_msgq_len_spammer(To, 10000000). + +process_info_self_msgq_len_spammer(_To, 0) -> + ok; +process_info_self_msgq_len_spammer(To, N) -> + To ! hejhopp, + erlang:demonitor(erlang:monitor(process, To)), + process_info_self_msgq_len_spammer(To, N-1). + +pi_sig_spam_test(SpamFun, PITest, PICheckRes) -> + SO = erlang:system_flag(schedulers_online, 1), + try + Self = self(), + SigSpammer = spawn_link(SpamFun), + process_flag(priority, low), + receive after 10 -> ok end, + Res = PITest(), + process_flag(priority, high), + unlink(SigSpammer), + exit(SigSpammer, kill), + false = is_process_alive(SigSpammer), + PICheckRes(Res) + after + _ = erlang:system_flag(schedulers_online, SO) + end. + %% Tests erlang:bump_reductions/1. bump_reductions(Config) when is_list(Config) -> diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index fa215618aa..64d8de67df 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -2479,7 +2479,7 @@ define etp-proc-state-int printf "active | " end if ($arg0 & 0x1000) - printf "unused | " + printf "maybe-self-sigs | " end if ($arg0 & 0x800) printf "exiting | " -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor