diff options
Diffstat (limited to 'rts/STM.c')
-rw-r--r-- | rts/STM.c | 383 |
1 files changed, 23 insertions, 360 deletions
@@ -211,15 +211,6 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, TRACE("%p : %s", trec, (result == expected) ? "success" : "failure"); return (result == expected); } - -static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) { - // Nothing -- uniproc - return true; -} - -static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { - // Nothing -- uniproc -} #endif #if defined(STM_CG_LOCK) /*........................................*/ @@ -272,15 +263,6 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, TRACE("%p : %d", result ? "success" : "failure"); return (result == expected); } - -static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) { - // Nothing -- protected by STM lock - return true; -} - -static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { - // Nothing -- protected by STM lock -} #endif #if defined(STM_FG_LOCKS) /*...................................*/ @@ -332,32 +314,10 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec, TRACE("%p : %s", trec, result ? "success" : "failure"); return (result == expected); } - -static StgBool lock_inv(StgAtomicInvariant *inv) { - return (cas(&(inv -> lock), 0, 1) == 0); -} - -static void unlock_inv(StgAtomicInvariant *inv) { - ASSERT(inv -> lock == 1); - inv -> lock = 0; -} #endif /*......................................................................*/ -static StgBool watcher_is_tso(StgTVarWatchQueue *q) { - StgClosure *c = q -> closure; - const StgInfoTable *info = get_itbl(c); - return (info -> type) == TSO; -} - -static StgBool watcher_is_invariant(StgTVarWatchQueue *q) { - StgClosure *c = q -> closure; - return (c->header.info == &stg_ATOMIC_INVARIANT_info); -} - -/*......................................................................*/ - // Helper functions for thread blocking and unblocking static void park_tso(StgTSO *tso) { @@ -372,24 +332,24 @@ static void unpark_tso(Capability *cap, StgTSO *tso) { // queues: it's up to the thread itself to remove it from the wait queues // if it decides to do so when it is scheduled. - // Unblocking a TSO from BlockedOnSTM is done under the TSO lock, - // to avoid multiple CPUs unblocking the same TSO, and also to - // synchronise with throwTo(). The first time the TSO is unblocked - // we mark this fact by setting block_info.closure == STM_AWOKEN. - // This way we can avoid sending further wakeup messages in the - // future. - lockTSO(tso); - if (tso->why_blocked == BlockedOnSTM && - tso->block_info.closure == &stg_STM_AWOKEN_closure) { - TRACE("unpark_tso already woken up tso=%p", tso); - } else if (tso -> why_blocked == BlockedOnSTM) { - TRACE("unpark_tso on tso=%p", tso); - tso->block_info.closure = &stg_STM_AWOKEN_closure; - tryWakeupThread(cap,tso); - } else { - TRACE("spurious unpark_tso on tso=%p", tso); - } - unlockTSO(tso); + // Only the capability that owns this TSO may unblock it. We can + // call tryWakeupThread() which will either unblock it directly if + // it belongs to this cap, or send a message to the owning cap + // otherwise. + + // TODO: This sends multiple messages if we write to the same TVar multiple + // times and the owning cap hasn't yet woken up the thread and removed it + // from the TVar's watch list. We tried to optimise this in D4961, but that + // patch was incorrect and broke other things, see #15544 comment:17. See + // #15626 for the tracking ticket. + + // Safety Note: we hold the TVar lock at this point, so we know + // that this thread is definitely still blocked, since the first + // thing a thread will do when it runs is remove itself from the + // TVar watch queues, and to do that it would need to lock the + // TVar. + + tryWakeupThread(cap,tso); } static void unpark_waiters_on(Capability *cap, StgTVar *s) { @@ -406,9 +366,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) { for (; q != END_STM_WATCH_QUEUE; q = q -> prev_queue_entry) { - if (watcher_is_tso(q)) { unpark_tso(cap, (StgTSO *)(q -> closure)); - } } } @@ -416,16 +374,6 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) { // Helper functions for downstream allocation and initialization -static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap, - StgAtomicInvariant *invariant) { - StgInvariantCheckQueue *result; - result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue)); - SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM); - result -> invariant = invariant; - result -> my_execution = NO_TREC; - return result; -} - static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap, StgClosure *closure) { StgTVarWatchQueue *result; @@ -452,7 +400,6 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap, result -> enclosing_trec = enclosing_trec; result -> current_chunk = new_stg_trec_chunk(cap); - result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE; if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; @@ -470,20 +417,6 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap, // Allocation / deallocation functions that retain per-capability lists // of closures that can be re-used -static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap, - StgAtomicInvariant *invariant) { - StgInvariantCheckQueue *result = NULL; - if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) { - result = new_stg_invariant_check_queue(cap, invariant); - } else { - result = cap -> free_invariant_check_queues; - result -> invariant = invariant; - result -> my_execution = NO_TREC; - cap -> free_invariant_check_queues = result -> next_queue_entry; - } - return result; -} - static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap, StgClosure *closure) { StgTVarWatchQueue *result = NULL; @@ -536,7 +469,6 @@ static StgTRecHeader *alloc_stg_trec_header(Capability *cap, cap -> free_trec_headers = result -> enclosing_trec; result -> enclosing_trec = enclosing_trec; result -> current_chunk -> next_entry_idx = 0; - result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE; if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; } else { @@ -1111,202 +1043,8 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade /*......................................................................*/ -/* - * Add/remove links between an invariant TVars. The caller must have - * locked the TVars involved and the invariant. - */ - -static void disconnect_invariant(Capability *cap, - StgAtomicInvariant *inv) { - StgTRecHeader *last_execution = inv -> last_execution; - - TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution); - - FOR_EACH_ENTRY(last_execution, e, { - StgTVar *s = e -> tvar; - StgTVarWatchQueue *q = s -> first_watch_queue_entry; - DEBUG_ONLY( StgBool found = false ); - TRACE(" looking for trec on tvar=%p", s); - for (q = s -> first_watch_queue_entry; - q != END_STM_WATCH_QUEUE; - q = q -> next_queue_entry) { - if (q -> closure == (StgClosure*)inv) { - StgTVarWatchQueue *pq; - StgTVarWatchQueue *nq; - nq = q -> next_queue_entry; - pq = q -> prev_queue_entry; - if (nq != END_STM_WATCH_QUEUE) { - nq -> prev_queue_entry = pq; - } - if (pq != END_STM_WATCH_QUEUE) { - pq -> next_queue_entry = nq; - } else { - ASSERT(s -> first_watch_queue_entry == q); - s -> first_watch_queue_entry = nq; - dirty_TVAR(cap,s); // we modified first_watch_queue_entry - } - TRACE(" found it in watch queue entry %p", q); - free_stg_tvar_watch_queue(cap, q); - DEBUG_ONLY( found = true ); - break; - } - } - ASSERT(found); - }); - inv -> last_execution = NO_TREC; -} - -static void connect_invariant_to_trec(Capability *cap, - StgAtomicInvariant *inv, - StgTRecHeader *my_execution) { - TRACE("connecting execution inv=%p trec=%p", inv, my_execution); - - ASSERT(inv -> last_execution == NO_TREC); - - FOR_EACH_ENTRY(my_execution, e, { - StgTVar *s = e -> tvar; - StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv); - StgTVarWatchQueue *fq = s -> first_watch_queue_entry; - - // We leave "last_execution" holding the values that will be - // in the heap after the transaction we're in the process - // of committing has finished. - TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL); - if (entry != NULL) { - e -> expected_value = entry -> new_value; - e -> new_value = entry -> new_value; - } - - TRACE(" linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q); - q -> next_queue_entry = fq; - q -> prev_queue_entry = END_STM_WATCH_QUEUE; - if (fq != END_STM_WATCH_QUEUE) { - fq -> prev_queue_entry = q; - } - s -> first_watch_queue_entry = q; - dirty_TVAR(cap,s); // we modified first_watch_queue_entry - }); - - inv -> last_execution = my_execution; -} - -/* - * Add a new invariant to the trec's list of invariants to check on commit - */ -void stmAddInvariantToCheck(Capability *cap, - StgTRecHeader *trec, - StgClosure *code) { - StgAtomicInvariant *invariant; - StgInvariantCheckQueue *q; - TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code); - ASSERT(trec != NO_TREC); - ASSERT(trec -> state == TREC_ACTIVE || - trec -> state == TREC_CONDEMNED); - - - // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC - // to signal that this is a new invariant in the current atomic block - - invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant)); - TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant); - SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM); - invariant -> code = code; - invariant -> last_execution = NO_TREC; - invariant -> lock = 0; - - // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec - - q = alloc_stg_invariant_check_queue(cap, invariant); - TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q); - q -> invariant = invariant; - q -> my_execution = NO_TREC; - q -> next_queue_entry = trec -> invariants_to_check; - trec -> invariants_to_check = q; - - TRACE("%p : stmAddInvariantToCheck done", trec); -} - -/* - * Fill in the trec's list of invariants that might be violated by the - * current transaction. - */ - -StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) { - StgTRecChunk *c; - TRACE("%p : stmGetInvariantsToCheck, head was %p", - trec, - trec -> invariants_to_check); - - ASSERT(trec != NO_TREC); - ASSERT((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_WAITING) || - (trec -> state == TREC_CONDEMNED)); - ASSERT(trec -> enclosing_trec == NO_TREC); - - lock_stm(trec); - c = trec -> current_chunk; - while (c != END_STM_CHUNK_LIST) { - unsigned int i; - for (i = 0; i < c -> next_entry_idx; i ++) { - TRecEntry *e = &(c -> entries[i]); - if (entry_is_update(e)) { - StgTVar *s = e -> tvar; - StgClosure *old = lock_tvar(trec, s); - - // Pick up any invariants on the TVar being updated - // by entry "e" - - StgTVarWatchQueue *q; - TRACE("%p : checking for invariants on %p", trec, s); - for (q = s -> first_watch_queue_entry; - q != END_STM_WATCH_QUEUE; - q = q -> next_queue_entry) { - if (watcher_is_invariant(q)) { - StgBool found = false; - StgInvariantCheckQueue *q2; - TRACE("%p : Touching invariant %p", trec, q -> closure); - for (q2 = trec -> invariants_to_check; - q2 != END_INVARIANT_CHECK_QUEUE; - q2 = q2 -> next_queue_entry) { - if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) { - TRACE("%p : Already found %p", trec, q -> closure); - found = true; - break; - } - } - - if (!found) { - StgInvariantCheckQueue *q3; - TRACE("%p : Not already found %p", trec, q -> closure); - q3 = alloc_stg_invariant_check_queue(cap, - (StgAtomicInvariant*) q -> closure); - q3 -> next_queue_entry = trec -> invariants_to_check; - trec -> invariants_to_check = q3; - } - } - } - - unlock_tvar(cap, trec, s, old, false); - } - } - c = c -> prev_chunk; - } - - unlock_stm(trec); - - TRACE("%p : stmGetInvariantsToCheck, head now %p", - trec, - trec -> invariants_to_check); - - return (trec -> invariants_to_check); -} - -/*......................................................................*/ - StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { StgInt64 max_commits_at_start = max_commits; - StgBool touched_invariants; - StgBool use_read_phase; TRACE("%p : stmCommitTransaction()", trec); ASSERT(trec != NO_TREC); @@ -1317,69 +1055,15 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED)); - // touched_invariants is true if we've written to a TVar with invariants - // attached to it, or if we're trying to add a new invariant to the system. - - touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE); - - // If we have touched invariants then (i) lock the invariant, and (ii) add - // the invariant's read set to our own. Step (i) is needed to serialize - // concurrent transactions that attempt to make conflicting updates - // to the invariant's trec (suppose it read from t1 and t2, and that one - // concurrent transcation writes only to t1, and a second writes only to - // t2). Step (ii) is needed so that both transactions will lock t1 and t2 - // to gain access to their wait lists (and hence be able to unhook the - // invariant from both tvars). - - if (touched_invariants) { - StgInvariantCheckQueue *q = trec -> invariants_to_check; - TRACE("%p : locking invariants", trec); - while (q != END_INVARIANT_CHECK_QUEUE) { - StgTRecHeader *inv_old_trec; - StgAtomicInvariant *inv; - TRACE("%p : locking invariant %p", trec, q -> invariant); - inv = q -> invariant; - if (!lock_inv(inv)) { - TRACE("%p : failed to lock %p", trec, inv); - trec -> state = TREC_CONDEMNED; - break; - } - - inv_old_trec = inv -> last_execution; - if (inv_old_trec != NO_TREC) { - StgTRecChunk *c = inv_old_trec -> current_chunk; - while (c != END_STM_CHUNK_LIST) { - unsigned int i; - for (i = 0; i < c -> next_entry_idx; i ++) { - TRecEntry *e = &(c -> entries[i]); - TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar); - merge_read_into (cap, trec, e -> tvar, e -> expected_value); - } - c = c -> prev_chunk; - } - } - q = q -> next_queue_entry; - } - TRACE("%p : finished locking invariants", trec); - } - // Use a read-phase (i.e. don't lock TVars we've read but not updated) if - // (i) the configuration lets us use a read phase, and (ii) we've not - // touched or introduced any invariants. - // - // In principle we could extend the implementation to support a read-phase - // and invariants, but it complicates the logic: the links between - // invariants and TVars are managed by the TVar watch queues which are - // protected by the TVar's locks. - - use_read_phase = ((config_use_read_phase) && (!touched_invariants)); + // the configuration lets us use a read phase. - bool result = validate_and_acquire_ownership(cap, trec, (!use_read_phase), true); + bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true); if (result) { // We now know that all the updated locations hold their expected values. ASSERT(trec -> state == TREC_ACTIVE); - if (use_read_phase) { + if (config_use_read_phase) { StgInt64 max_commits_at_end; StgInt64 max_concurrent_commits; TRACE("%p : doing read check", trec); @@ -1399,32 +1083,11 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { // at the end of the call to validate_and_acquire_ownership. This forms the // linearization point of the commit. - // 1. If we have touched or introduced any invariants then unhook them - // from the TVars they depended on last time they were executed - // and hook them on the TVars that they now depend on. - if (touched_invariants) { - StgInvariantCheckQueue *q = trec -> invariants_to_check; - while (q != END_INVARIANT_CHECK_QUEUE) { - StgAtomicInvariant *inv = q -> invariant; - if (inv -> last_execution != NO_TREC) { - disconnect_invariant(cap, inv); - } - - TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution); - connect_invariant_to_trec(cap, inv, q -> my_execution); - - TRACE("%p : unlocking invariant %p", trec, inv); - unlock_inv(inv); - - q = q -> next_queue_entry; - } - } - - // 2. Make the updates required by the transaction + // Make the updates required by the transaction. FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; - if ((!use_read_phase) || (e -> new_value != e -> expected_value)) { + if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) { // Either the entry is an update or we're not using a read phase: // write the value back to the TVar, unlocking it if necessary. |