summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortharris@microsoft.com <unknown>2006-10-07 12:29:07 +0000
committertharris@microsoft.com <unknown>2006-10-07 12:29:07 +0000
commit9cef40bd4dd2536c7a370a1a9b78461c152805cc (patch)
tree7a9657ce6af6880cc73775ddeeaccab4cecb7e12
parent87c36991c22f208623c96506c64c6c163361e45b (diff)
downloadhaskell-9cef40bd4dd2536c7a370a1a9b78461c152805cc.tar.gz
STM invariants
-rw-r--r--compiler/prelude/primops.txt.pp7
-rw-r--r--includes/ClosureTypes.h18
-rw-r--r--includes/Closures.h56
-rw-r--r--includes/Cmm.h5
-rw-r--r--includes/STM.h24
-rw-r--r--includes/StgMiscClosures.h18
-rw-r--r--includes/Storage.h8
-rw-r--r--includes/mkDerivedConstants.c13
-rw-r--r--rts/Capability.c3
-rw-r--r--rts/Capability.h3
-rw-r--r--rts/ClosureFlags.c6
-rw-r--r--rts/Exception.cmm26
-rw-r--r--rts/GC.c122
-rw-r--r--rts/GCCompact.c28
-rw-r--r--rts/LdvProfile.c4
-rw-r--r--rts/Linker.c1
-rw-r--r--rts/PrimOps.cmm179
-rw-r--r--rts/Printer.c4
-rw-r--r--rts/ProfHeap.c14
-rw-r--r--rts/RaiseAsync.c1
-rw-r--r--rts/RetainerProfile.c14
-rw-r--r--rts/STM.c743
-rw-r--r--rts/Sanity.c25
-rw-r--r--rts/Schedule.c22
-rw-r--r--rts/StgMiscClosures.cmm21
25 files changed, 1041 insertions, 324 deletions
diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
index 13b4b6c97d..ef5dfc9997 100644
--- a/compiler/prelude/primops.txt.pp
+++ b/compiler/prelude/primops.txt.pp
@@ -1282,6 +1282,13 @@ primop CatchSTMOp "catchSTM#" GenPrimOp
out_of_line = True
has_side_effects = True
+primop Check "check#" GenPrimOp
+ (State# RealWorld -> (# State# RealWorld, a #) )
+ -> (State# RealWorld -> (# State# RealWorld, () #) )
+ with
+ out_of_line = True
+ has_side_effects = True
+
primop NewTVarOp "newTVar#" GenPrimOp
a
-> State# s -> (# State# s, TVar# s a #)
diff --git a/includes/ClosureTypes.h b/includes/ClosureTypes.h
index ae2aab3407..ff4dec1215 100644
--- a/includes/ClosureTypes.h
+++ b/includes/ClosureTypes.h
@@ -85,13 +85,15 @@
#define RBH 61
#define EVACUATED 62
#define REMOTE_REF 63
-#define TVAR_WAIT_QUEUE 64
-#define TVAR 65
-#define TREC_CHUNK 66
-#define TREC_HEADER 67
-#define ATOMICALLY_FRAME 68
-#define CATCH_RETRY_FRAME 69
-#define CATCH_STM_FRAME 70
-#define N_CLOSURE_TYPES 71
+#define TVAR_WATCH_QUEUE 64
+#define INVARIANT_CHECK_QUEUE 65
+#define ATOMIC_INVARIANT 66
+#define TVAR 67
+#define TREC_CHUNK 68
+#define TREC_HEADER 69
+#define ATOMICALLY_FRAME 70
+#define CATCH_RETRY_FRAME 71
+#define CATCH_STM_FRAME 72
+#define N_CLOSURE_TYPES 73
#endif /* CLOSURETYPES_H */
diff --git a/includes/Closures.h b/includes/Closures.h
index 3df208cd09..d5458f4451 100644
--- a/includes/Closures.h
+++ b/includes/Closures.h
@@ -331,7 +331,7 @@ typedef struct {
* space for these data structures at the cost of more complexity in the
* implementation:
*
- * - In StgTVar, current_value and first_wait_queue_entry could be held in
+ * - In StgTVar, current_value and first_watch_queue_entry could be held in
* the same field: if any thread is waiting then its expected_value for
* the tvar is the current value.
*
@@ -345,24 +345,33 @@ typedef struct {
* (it immediately switches on frame->waiting anyway).
*/
-typedef struct StgTVarWaitQueue_ {
+typedef struct StgTRecHeader_ StgTRecHeader;
+
+typedef struct StgTVarWatchQueue_ {
StgHeader header;
- struct StgTSO_ *waiting_tso;
- struct StgTVarWaitQueue_ *next_queue_entry;
- struct StgTVarWaitQueue_ *prev_queue_entry;
-} StgTVarWaitQueue;
+ StgClosure *closure; // StgTSO or StgAtomicInvariant
+ struct StgTVarWatchQueue_ *next_queue_entry;
+ struct StgTVarWatchQueue_ *prev_queue_entry;
+} StgTVarWatchQueue;
typedef struct {
StgHeader header;
StgClosure *volatile current_value;
- StgTVarWaitQueue *volatile first_wait_queue_entry;
+ StgTVarWatchQueue *volatile first_watch_queue_entry;
#if defined(THREADED_RTS)
StgInt volatile num_updates;
#endif
} StgTVar;
+typedef struct {
+ StgHeader header;
+ StgClosure *code;
+ StgTRecHeader *last_execution;
+ StgWord lock;
+} StgAtomicInvariant;
+
/* new_value == expected_value for read-only accesses */
-/* new_value is a StgTVarWaitQueue entry when trec in state TREC_WAITING */
+/* new_value is a StgTVarWatchQueue entry when trec in state TREC_WAITING */
typedef struct {
StgTVar *tvar;
StgClosure *expected_value;
@@ -389,29 +398,38 @@ typedef enum {
TREC_WAITING, /* Transaction currently waiting */
} TRecState;
-typedef struct StgTRecHeader_ {
+typedef struct StgInvariantCheckQueue_ {
+ StgHeader header;
+ StgAtomicInvariant *invariant;
+ StgTRecHeader *my_execution;
+ struct StgInvariantCheckQueue_ *next_queue_entry;
+} StgInvariantCheckQueue;
+
+struct StgTRecHeader_ {
StgHeader header;
TRecState state;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
-} StgTRecHeader;
+ StgInvariantCheckQueue *invariants_to_check;
+};
typedef struct {
- StgHeader header;
- StgClosure *code;
+ StgHeader header;
+ StgClosure *code;
+ StgTVarWatchQueue *next_invariant_to_check;
} StgAtomicallyFrame;
typedef struct {
- StgHeader header;
- StgClosure *handler;
+ StgHeader header;
+ StgClosure *code;
+ StgClosure *handler;
} StgCatchSTMFrame;
typedef struct {
- StgHeader header;
- StgBool running_alt_code;
- StgClosure *first_code;
- StgClosure *alt_code;
- StgTRecHeader *first_code_trec;
+ StgHeader header;
+ StgBool running_alt_code;
+ StgClosure *first_code;
+ StgClosure *alt_code;
} StgCatchRetryFrame;
#if defined(PAR) || defined(GRAN)
diff --git a/includes/Cmm.h b/includes/Cmm.h
index d58eebc424..25ffb5d24f 100644
--- a/includes/Cmm.h
+++ b/includes/Cmm.h
@@ -513,8 +513,9 @@
Misc junk
-------------------------------------------------------------------------- */
-#define NO_TREC stg_NO_TREC_closure
-#define END_TSO_QUEUE stg_END_TSO_QUEUE_closure
+#define NO_TREC stg_NO_TREC_closure
+#define END_TSO_QUEUE stg_END_TSO_QUEUE_closure
+#define END_INVARIANT_CHECK_QUEUE stg_END_INVARIANT_CHECK_QUEUE_closure
#define dirtyTSO(tso) \
StgTSO_flags(tso) = StgTSO_flags(tso) | TSO_DIRTY::I32;
diff --git a/includes/STM.h b/includes/STM.h
index ebbf193b0b..3bf976551d 100644
--- a/includes/STM.h
+++ b/includes/STM.h
@@ -66,14 +66,13 @@ extern StgTRecHeader *stmStartNestedTransaction(Capability *cap, StgTRecHeader *
);
/*
- * Exit the current transaction context, abandoning any read/write
- * operations performed within it and removing the thread from any
- * tvar wait queues if it was waitin. Note that if nested transactions
- * are not fully supported then this may leave the enclosing
- * transaction contexts doomed to abort.
+ * Roll back the current transatcion context. NB: if this is a nested tx
+ * then we merge its read set into its parents. This is because a change
+ * to that read set could change whether or not the tx should abort.
*/
extern void stmAbortTransaction(Capability *cap, StgTRecHeader *trec);
+extern void stmFreeAbortedTRec(Capability *cap, StgTRecHeader *trec);
/*
* Ensure that a subsequent commit / validation will fail. We use this
@@ -149,6 +148,18 @@ extern StgBool stmValidateNestOfTransactions(StgTRecHeader *trec);
*/
/*
+ * Fill in the trec's list of invariants that might be violated by the current
+ * transaction.
+ */
+
+extern StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap,
+ StgTRecHeader *trec);
+
+extern void stmAddInvariantToCheck(Capability *cap,
+ StgTRecHeader *trec,
+ StgClosure *code);
+
+/*
* Test whether the current transaction context is valid and, if so,
* commit its memory accesses to the heap. stmCommitTransaction must
* unblock any threads which are waiting on tvars that updates have
@@ -218,7 +229,8 @@ extern void stmWriteTVar(Capability *cap,
/* NULLs */
-#define END_STM_WAIT_QUEUE ((StgTVarWaitQueue *)(void *)&stg_END_STM_WAIT_QUEUE_closure)
+#define END_STM_WATCH_QUEUE ((StgTVarWatchQueue *)(void *)&stg_END_STM_WATCH_QUEUE_closure)
+#define END_INVARIANT_CHECK_QUEUE ((StgInvariantCheckQueue *)(void *)&stg_END_INVARIANT_CHECK_QUEUE_closure)
#define END_STM_CHUNK_LIST ((StgTRecChunk *)(void *)&stg_END_STM_CHUNK_LIST_closure)
#define NO_TREC ((StgTRecHeader *)(void *)&stg_NO_TREC_closure)
diff --git a/includes/StgMiscClosures.h b/includes/StgMiscClosures.h
index 8eabace8a6..4f638ea9ce 100644
--- a/includes/StgMiscClosures.h
+++ b/includes/StgMiscClosures.h
@@ -136,11 +136,14 @@ RTS_INFO(stg_AP_info);
RTS_INFO(stg_AP_STACK_info);
RTS_INFO(stg_dummy_ret_info);
RTS_INFO(stg_raise_info);
-RTS_INFO(stg_TVAR_WAIT_QUEUE_info);
+RTS_INFO(stg_TVAR_WATCH_QUEUE_info);
+RTS_INFO(stg_INVARIANT_CHECK_QUEUE_info);
+RTS_INFO(stg_ATOMIC_INVARIANT_info);
RTS_INFO(stg_TVAR_info);
RTS_INFO(stg_TREC_CHUNK_info);
RTS_INFO(stg_TREC_HEADER_info);
-RTS_INFO(stg_END_STM_WAIT_QUEUE_info);
+RTS_INFO(stg_END_STM_WATCH_QUEUE_info);
+RTS_INFO(stg_END_INVARIANT_CHECK_QUEUE_info);
RTS_INFO(stg_END_STM_CHUNK_LIST_info);
RTS_INFO(stg_NO_TREC_info);
@@ -197,11 +200,14 @@ RTS_ENTRY(stg_AP_entry);
RTS_ENTRY(stg_AP_STACK_entry);
RTS_ENTRY(stg_dummy_ret_entry);
RTS_ENTRY(stg_raise_entry);
-RTS_ENTRY(stg_END_STM_WAIT_QUEUE_entry);
+RTS_ENTRY(stg_END_STM_WATCH_QUEUE_entry);
+RTS_ENTRY(stg_END_INVARIANT_CHECK_QUEUE_entry);
RTS_ENTRY(stg_END_STM_CHUNK_LIST_entry);
RTS_ENTRY(stg_NO_TREC_entry);
RTS_ENTRY(stg_TVAR_entry);
-RTS_ENTRY(stg_TVAR_WAIT_QUEUE_entry);
+RTS_ENTRY(stg_TVAR_WATCH_QUEUE_entry);
+RTS_ENTRY(stg_INVARIANT_CHECK_QUEUE_entry);
+RTS_ENTRY(stg_ATOMIC_INVARIANT_entry);
RTS_ENTRY(stg_TREC_CHUNK_entry);
RTS_ENTRY(stg_TREC_HEADER_entry);
@@ -224,7 +230,8 @@ RTS_CLOSURE(stg_NO_FINALIZER_closure);
RTS_CLOSURE(stg_dummy_ret_closure);
RTS_CLOSURE(stg_forceIO_closure);
-RTS_CLOSURE(stg_END_STM_WAIT_QUEUE_closure);
+RTS_CLOSURE(stg_END_STM_WATCH_QUEUE_closure);
+RTS_CLOSURE(stg_END_INVARIANT_CHECK_QUEUE_closure);
RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
RTS_CLOSURE(stg_NO_TREC_closure);
@@ -605,5 +612,6 @@ RTS_FUN(atomicallyzh_fast);
RTS_FUN(newTVarzh_fast);
RTS_FUN(readTVarzh_fast);
RTS_FUN(writeTVarzh_fast);
+RTS_FUN(checkzh_fast);
#endif /* STGMISCCLOSURES_H */
diff --git a/includes/Storage.h b/includes/Storage.h
index 1886e0921e..0b93378083 100644
--- a/includes/Storage.h
+++ b/includes/Storage.h
@@ -406,14 +406,18 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
return tso_sizeW((StgTSO *)p);
case BCO:
return bco_sizeW((StgBCO *)p);
- case TVAR_WAIT_QUEUE:
- return sizeofW(StgTVarWaitQueue);
+ case TVAR_WATCH_QUEUE:
+ return sizeofW(StgTVarWatchQueue);
case TVAR:
return sizeofW(StgTVar);
case TREC_CHUNK:
return sizeofW(StgTRecChunk);
case TREC_HEADER:
return sizeofW(StgTRecHeader);
+ case ATOMIC_INVARIANT:
+ return sizeofW(StgAtomicInvariant);
+ case INVARIANT_CHECK_QUEUE:
+ return sizeofW(StgInvariantCheckQueue);
default:
return sizeW_fromITBL(info);
}
diff --git a/includes/mkDerivedConstants.c b/includes/mkDerivedConstants.c
index 05bf373e40..ded645ca03 100644
--- a/includes/mkDerivedConstants.c
+++ b/includes/mkDerivedConstants.c
@@ -328,15 +328,26 @@ main(int argc, char *argv[])
closure_size(StgAtomicallyFrame);
closure_field(StgAtomicallyFrame, code);
+ closure_field(StgAtomicallyFrame, next_invariant_to_check);
+
+ closure_field(StgInvariantCheckQueue, invariant);
+ closure_field(StgInvariantCheckQueue, my_execution);
+ closure_field(StgInvariantCheckQueue, next_queue_entry);
+
+ closure_field(StgAtomicInvariant, code);
closure_size(StgCatchSTMFrame);
closure_field(StgCatchSTMFrame, handler);
+ closure_field(StgCatchSTMFrame, code);
closure_size(StgCatchRetryFrame);
closure_field(StgCatchRetryFrame, running_alt_code);
closure_field(StgCatchRetryFrame, first_code);
closure_field(StgCatchRetryFrame, alt_code);
- closure_field(StgCatchRetryFrame, first_code_trec);
+
+ closure_field(StgTVarWatchQueue, closure);
+ closure_field(StgTVarWatchQueue, next_queue_entry);
+ closure_field(StgTVarWatchQueue, prev_queue_entry);
closure_size(StgWeak);
closure_field(StgWeak,link);
diff --git a/rts/Capability.c b/rts/Capability.c
index 99c68fd3b8..f1c625ef7c 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -153,7 +153,8 @@ initCapability( Capability *cap, nat i )
cap->mut_lists[g] = NULL;
}
- cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
+ cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
cap->free_trec_chunks = END_STM_CHUNK_LIST;
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
diff --git a/rts/Capability.h b/rts/Capability.h
index 641f37db01..dd17863c60 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -89,7 +89,8 @@ struct Capability_ {
#endif
// Per-capability STM-related data
- StgTVarWaitQueue *free_tvar_wait_queues;
+ StgTVarWatchQueue *free_tvar_watch_queues;
+ StgInvariantCheckQueue *free_invariant_check_queues;
StgTRecChunk *free_trec_chunks;
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c
index 260bf390d0..c282cf3a88 100644
--- a/rts/ClosureFlags.c
+++ b/rts/ClosureFlags.c
@@ -90,7 +90,9 @@ StgWord16 closure_flags[] = {
/* RBH = */ ( _NS| _MUT|_UPT ),
/* EVACUATED = */ ( 0 ),
/* REMOTE_REF = */ (_HNF| _NS| _UPT ),
-/* TVAR_WAIT_QUEUE = */ ( _NS| _MUT|_UPT ),
+/* TVAR_WATCH_QUEUE = */ ( _NS| _MUT|_UPT ),
+/* INVARIANT_CHECK_QUEUE= */ ( _NS| _MUT|_UPT ),
+/* ATOMIC_INVARIANT = */ ( _NS| _MUT|_UPT ),
/* TVAR = */ (_HNF| _NS| _MUT|_UPT ),
/* TREC_CHUNK = */ ( _NS| _MUT|_UPT ),
/* TREC_HEADER = */ ( _NS| _MUT|_UPT ),
@@ -99,6 +101,6 @@ StgWord16 closure_flags[] = {
/* CATCH_STM_FRAME = */ ( _BTM )
};
-#if N_CLOSURE_TYPES != 71
+#if N_CLOSURE_TYPES != 73
#error Closure types changed: update ClosureFlags.c!
#endif
diff --git a/rts/Exception.cmm b/rts/Exception.cmm
index 0c1b6648d5..1104706c9c 100644
--- a/rts/Exception.cmm
+++ b/rts/Exception.cmm
@@ -344,12 +344,25 @@ retry_pop_stack:
if (frame_type == ATOMICALLY_FRAME) {
/* The exception has reached the edge of a memory transaction. Check that
* the transaction is valid. If not then perhaps the exception should
- * not have been thrown: re-run the transaction */
- W_ trec;
+ * not have been thrown: re-run the transaction. "trec" will either be
+ * a top-level transaction running the atomic block, or a nested
+ * transaction running an invariant check. In the latter case we
+ * abort and de-allocate the top-level transaction that encloses it
+ * as well (we could just abandon its transaction record, but this makes
+ * sure it's marked as aborted and available for re-use). */
+ W_ trec, outer;
W_ r;
trec = StgTSO_trec(CurrentTSO);
r = foreign "C" stmValidateNestOfTransactions(trec "ptr");
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr");
+ foreign "C" stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr");
+
+ if (outer != NO_TREC) {
+ foreign "C" stmAbortTransaction(MyCapability() "ptr", outer "ptr");
+ foreign "C" stmFreeAbortedTRec(MyCapability() "ptr", outer "ptr");
+ }
+
StgTSO_trec(CurrentTSO) = NO_TREC;
if (r != 0) {
// Transaction was valid: continue searching for a catch frame
@@ -400,6 +413,9 @@ retry_pop_stack:
* If exceptions were unblocked, arrange that they are unblocked
* again after executing the handler by pushing an
* unblockAsyncExceptions_ret stack frame.
+ *
+ * If we've reached an STM catch frame then roll back the nested
+ * transaction we were using.
*/
W_ frame;
frame = Sp;
@@ -410,6 +426,12 @@ retry_pop_stack:
Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
}
} else {
+ W_ trec, outer;
+ trec = StgTSO_trec(CurrentTSO);
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
+ foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr") [];
+ foreign "C" stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr") [];
+ StgTSO_trec(CurrentTSO) = outer;
Sp = Sp + SIZEOF_StgCatchSTMFrame;
}
diff --git a/rts/GC.c b/rts/GC.c
index 66bb5dc487..4e8b3c2a26 100644
--- a/rts/GC.c
+++ b/rts/GC.c
@@ -2233,8 +2233,8 @@ loop:
case TREC_HEADER:
return copy(q,sizeofW(StgTRecHeader),stp);
- case TVAR_WAIT_QUEUE:
- return copy(q,sizeofW(StgTVarWaitQueue),stp);
+ case TVAR_WATCH_QUEUE:
+ return copy(q,sizeofW(StgTVarWatchQueue),stp);
case TVAR:
return copy(q,sizeofW(StgTVar),stp);
@@ -2242,6 +2242,12 @@ loop:
case TREC_CHUNK:
return copy(q,sizeofW(StgTRecChunk),stp);
+ case ATOMIC_INVARIANT:
+ return copy(q,sizeofW(StgAtomicInvariant),stp);
+
+ case INVARIANT_CHECK_QUEUE:
+ return copy(q,sizeofW(StgInvariantCheckQueue),stp);
+
default:
barf("evacuate: strange closure type %d", (int)(info->type));
}
@@ -3112,16 +3118,16 @@ scavenge(step *stp)
}
#endif
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
{
- StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p);
+ StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
evac_gen = 0;
- wq->waiting_tso = (StgTSO *)evacuate((StgClosure*)wq->waiting_tso);
- wq->next_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->next_queue_entry);
- wq->prev_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
+ wq->closure = (StgClosure*)evacuate((StgClosure*)wq->closure);
+ wq->next_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->next_queue_entry);
+ wq->prev_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
- p += sizeofW(StgTVarWaitQueue);
+ p += sizeofW(StgTVarWatchQueue);
break;
}
@@ -3130,7 +3136,7 @@ scavenge(step *stp)
StgTVar *tvar = ((StgTVar *) p);
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
- tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
+ tvar->first_watch_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)tvar->first_watch_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
p += sizeofW(StgTVar);
@@ -3143,6 +3149,7 @@ scavenge(step *stp)
evac_gen = 0;
trec->enclosing_trec = (StgTRecHeader *)evacuate((StgClosure*)trec->enclosing_trec);
trec->current_chunk = (StgTRecChunk *)evacuate((StgClosure*)trec->current_chunk);
+ trec->invariants_to_check = (StgInvariantCheckQueue *)evacuate((StgClosure*)trec->invariants_to_check);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
p += sizeofW(StgTRecHeader);
@@ -3167,6 +3174,31 @@ scavenge(step *stp)
break;
}
+ case ATOMIC_INVARIANT:
+ {
+ StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
+ evac_gen = 0;
+ invariant->code = (StgClosure *)evacuate(invariant->code);
+ invariant->last_execution = (StgTRecHeader *)evacuate((StgClosure*)invariant->last_execution);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ p += sizeofW(StgAtomicInvariant);
+ break;
+ }
+
+ case INVARIANT_CHECK_QUEUE:
+ {
+ StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
+ evac_gen = 0;
+ queue->invariant = (StgAtomicInvariant *)evacuate((StgClosure*)queue->invariant);
+ queue->my_execution = (StgTRecHeader *)evacuate((StgClosure*)queue->my_execution);
+ queue->next_queue_entry = (StgInvariantCheckQueue *)evacuate((StgClosure*)queue->next_queue_entry);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ p += sizeofW(StgInvariantCheckQueue);
+ break;
+ }
+
default:
barf("scavenge: unimplemented/strange closure type %d @ %p",
info->type, p);
@@ -3496,13 +3528,13 @@ linear_scan:
}
#endif /* PAR */
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
{
- StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p);
+ StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
evac_gen = 0;
- wq->waiting_tso = (StgTSO *)evacuate((StgClosure*)wq->waiting_tso);
- wq->next_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->next_queue_entry);
- wq->prev_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
+ wq->closure = (StgClosure*)evacuate((StgClosure*)wq->closure);
+ wq->next_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->next_queue_entry);
+ wq->prev_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
@@ -3513,7 +3545,7 @@ linear_scan:
StgTVar *tvar = ((StgTVar *) p);
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
- tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
+ tvar->first_watch_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)tvar->first_watch_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
@@ -3542,11 +3574,35 @@ linear_scan:
evac_gen = 0;
trec->enclosing_trec = (StgTRecHeader *)evacuate((StgClosure*)trec->enclosing_trec);
trec->current_chunk = (StgTRecChunk *)evacuate((StgClosure*)trec->current_chunk);
+ trec->invariants_to_check = (StgInvariantCheckQueue *)evacuate((StgClosure*)trec->invariants_to_check);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
}
+ case ATOMIC_INVARIANT:
+ {
+ StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
+ evac_gen = 0;
+ invariant->code = (StgClosure *)evacuate(invariant->code);
+ invariant->last_execution = (StgTRecHeader *)evacuate((StgClosure*)invariant->last_execution);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ break;
+ }
+
+ case INVARIANT_CHECK_QUEUE:
+ {
+ StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
+ evac_gen = 0;
+ queue->invariant = (StgAtomicInvariant *)evacuate((StgClosure*)queue->invariant);
+ queue->my_execution = (StgTRecHeader *)evacuate((StgClosure*)queue->my_execution);
+ queue->next_queue_entry = (StgInvariantCheckQueue *)evacuate((StgClosure*)queue->next_queue_entry);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ break;
+ }
+
default:
barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p",
info->type, p);
@@ -3847,13 +3903,13 @@ scavenge_one(StgPtr p)
}
#endif
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
{
- StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p);
+ StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
evac_gen = 0;
- wq->waiting_tso = (StgTSO *)evacuate((StgClosure*)wq->waiting_tso);
- wq->next_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->next_queue_entry);
- wq->prev_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
+ wq->closure = (StgClosure*)evacuate((StgClosure*)wq->closure);
+ wq->next_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->next_queue_entry);
+ wq->prev_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)wq->prev_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
@@ -3864,7 +3920,7 @@ scavenge_one(StgPtr p)
StgTVar *tvar = ((StgTVar *) p);
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
- tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
+ tvar->first_watch_queue_entry = (StgTVarWatchQueue *)evacuate((StgClosure*)tvar->first_watch_queue_entry);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
@@ -3876,6 +3932,7 @@ scavenge_one(StgPtr p)
evac_gen = 0;
trec->enclosing_trec = (StgTRecHeader *)evacuate((StgClosure*)trec->enclosing_trec);
trec->current_chunk = (StgTRecChunk *)evacuate((StgClosure*)trec->current_chunk);
+ trec->invariants_to_check = (StgInvariantCheckQueue *)evacuate((StgClosure*)trec->invariants_to_check);
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
@@ -3898,6 +3955,29 @@ scavenge_one(StgPtr p)
break;
}
+ case ATOMIC_INVARIANT:
+ {
+ StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
+ evac_gen = 0;
+ invariant->code = (StgClosure *)evacuate(invariant->code);
+ invariant->last_execution = (StgTRecHeader *)evacuate((StgClosure*)invariant->last_execution);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ break;
+ }
+
+ case INVARIANT_CHECK_QUEUE:
+ {
+ StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
+ evac_gen = 0;
+ queue->invariant = (StgAtomicInvariant *)evacuate((StgClosure*)queue->invariant);
+ queue->my_execution = (StgTRecHeader *)evacuate((StgClosure*)queue->my_execution);
+ queue->next_queue_entry = (StgInvariantCheckQueue *)evacuate((StgClosure*)queue->next_queue_entry);
+ evac_gen = saved_evac_gen;
+ failed_to_evac = rtsTrue; // mutable
+ break;
+ }
+
case IND_OLDGEN:
case IND_OLDGEN_PERM:
case IND_STATIC:
diff --git a/rts/GCCompact.c b/rts/GCCompact.c
index 7f91501101..da3c7a7c62 100644
--- a/rts/GCCompact.c
+++ b/rts/GCCompact.c
@@ -628,20 +628,20 @@ thread_obj (StgInfoTable *info, StgPtr p)
case TSO:
return thread_TSO((StgTSO *)p);
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
{
- StgTVarWaitQueue *wq = (StgTVarWaitQueue *)p;
- thread_(&wq->waiting_tso);
+ StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
+ thread_(&wq->closure);
thread_(&wq->next_queue_entry);
thread_(&wq->prev_queue_entry);
- return p + sizeofW(StgTVarWaitQueue);
+ return p + sizeofW(StgTVarWatchQueue);
}
case TVAR:
{
StgTVar *tvar = (StgTVar *)p;
thread((void *)&tvar->current_value);
- thread((void *)&tvar->first_wait_queue_entry);
+ thread((void *)&tvar->first_watch_queue_entry);
return p + sizeofW(StgTVar);
}
@@ -650,6 +650,7 @@ thread_obj (StgInfoTable *info, StgPtr p)
StgTRecHeader *trec = (StgTRecHeader *)p;
thread_(&trec->enclosing_trec);
thread_(&trec->current_chunk);
+ thread_(&trec->invariants_to_check);
return p + sizeofW(StgTRecHeader);
}
@@ -667,6 +668,23 @@ thread_obj (StgInfoTable *info, StgPtr p)
return p + sizeofW(StgTRecChunk);
}
+ case ATOMIC_INVARIANT:
+ {
+ StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
+ thread_(&invariant->code);
+ thread_(&invariant->last_execution);
+ return p + sizeofW(StgAtomicInvariant);
+ }
+
+ case INVARIANT_CHECK_QUEUE:
+ {
+ StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p;
+ thread_(&queue->invariant);
+ thread_(&queue->my_execution);
+ thread_(&queue->next_queue_entry);
+ return p + sizeofW(StgInvariantCheckQueue);
+ }
+
default:
barf("update_fwd: unknown/strange object %d", (int)(info->type));
return NULL;
diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c
index 5d96811ed7..2f9f6ca40a 100644
--- a/rts/LdvProfile.c
+++ b/rts/LdvProfile.c
@@ -108,10 +108,12 @@ processHeapClosureForDead( StgClosure *c )
case MUT_VAR_DIRTY:
case BCO:
case STABLE_NAME:
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
case TVAR:
case TREC_HEADER:
case TREC_CHUNK:
+ case INVARIANT_CHECK_QUEUE:
+ case ATOMIC_INVARIANT:
return size;
/*
diff --git a/rts/Linker.c b/rts/Linker.c
index a8c0cdb4e2..b6e8249abd 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -488,6 +488,7 @@ typedef struct _RtsSymbolVal {
SymX(catchzh_fast) \
SymX(catchRetryzh_fast) \
SymX(catchSTMzh_fast) \
+ SymX(checkzh_fast) \
SymX(closure_flags) \
SymX(cmp_thread) \
SymX(cmpIntegerzh_fast) \
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index 990d6f31d9..075da4192d 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -980,10 +980,10 @@ CATCH_RETRY_FRAME_ERROR(stg_catch_retry_frame_7_ret)
#if defined(PROFILING)
#define CATCH_RETRY_FRAME_BITMAP 7
-#define CATCH_RETRY_FRAME_WORDS 6
+#define CATCH_RETRY_FRAME_WORDS 5
#else
#define CATCH_RETRY_FRAME_BITMAP 1
-#define CATCH_RETRY_FRAME_WORDS 4
+#define CATCH_RETRY_FRAME_WORDS 3
#endif
INFO_TABLE_RET(stg_catch_retry_frame,
@@ -1012,7 +1012,7 @@ INFO_TABLE_RET(stg_catch_retry_frame,
IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;)
jump %ENTRY_CODE(Sp(SP_OFF));
} else {
- /* Did not commit: retry */
+ /* Did not commit: re-execute */
W_ new_trec;
"ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") [];
StgTSO_trec(CurrentTSO) = new_trec;
@@ -1020,7 +1020,6 @@ INFO_TABLE_RET(stg_catch_retry_frame,
R1 = StgCatchRetryFrame_alt_code(frame);
} else {
R1 = StgCatchRetryFrame_first_code(frame);
- StgCatchRetryFrame_first_code_trec(frame) = new_trec;
}
jump stg_ap_v_fast;
}
@@ -1048,10 +1047,10 @@ ATOMICALLY_FRAME_ERROR(stg_atomically_frame_7_ret)
#if defined(PROFILING)
#define ATOMICALLY_FRAME_BITMAP 3
-#define ATOMICALLY_FRAME_WORDS 3
+#define ATOMICALLY_FRAME_WORDS 4
#else
#define ATOMICALLY_FRAME_BITMAP 0
-#define ATOMICALLY_FRAME_WORDS 1
+#define ATOMICALLY_FRAME_WORDS 2
#endif
@@ -1067,26 +1066,61 @@ INFO_TABLE_RET(stg_atomically_frame,
stg_atomically_frame_6_ret,
stg_atomically_frame_7_ret)
{
- W_ frame, trec, valid;
+ W_ frame, trec, valid, next_invariant, q, outer;
IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); )
frame = Sp;
trec = StgTSO_trec(CurrentTSO);
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
+
+ if (outer == NO_TREC) {
+ /* First time back at the atomically frame -- pick up invariants */
+ "ptr" q = foreign "C" stmGetInvariantsToCheck(MyCapability() "ptr", trec "ptr") [];
+ StgAtomicallyFrame_next_invariant_to_check(frame) = q;
- /* The TSO is not currently waiting: try to commit the transaction */
- valid = foreign "C" stmCommitTransaction(MyCapability() "ptr", trec "ptr") [];
- if (valid != 0) {
- /* Transaction was valid: commit succeeded */
- StgTSO_trec(CurrentTSO) = NO_TREC;
- Sp = Sp + SIZEOF_StgAtomicallyFrame;
- IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;)
- jump %ENTRY_CODE(Sp(SP_OFF));
} else {
- /* Transaction was not valid: try again */
- "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr") [];
+ /* Second/subsequent time back at the atomically frame -- abort the
+ * tx that's checking the invariant and move on to the next one */
+ StgTSO_trec(CurrentTSO) = outer;
+ q = StgAtomicallyFrame_next_invariant_to_check(frame);
+ StgInvariantCheckQueue_my_execution(q) = trec;
+ foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr") [];
+ /* Don't free trec -- it's linked from q and will be stashed in the
+ * invariant if we eventually commit. */
+ q = StgInvariantCheckQueue_next_queue_entry(q);
+ StgAtomicallyFrame_next_invariant_to_check(frame) = q;
+ trec = outer;
+ }
+
+ q = StgAtomicallyFrame_next_invariant_to_check(frame);
+
+ if (q != END_INVARIANT_CHECK_QUEUE) {
+ /* We can't commit yet: another invariant to check */
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", trec "ptr") [];
StgTSO_trec(CurrentTSO) = trec;
- R1 = StgAtomicallyFrame_code(frame);
+
+ next_invariant = StgInvariantCheckQueue_invariant(q);
+ R1 = StgAtomicInvariant_code(next_invariant);
jump stg_ap_v_fast;
+
+ } else {
+
+ /* We've got no more invariants to check, try to commit */
+ valid = foreign "C" stmCommitTransaction(MyCapability() "ptr", trec "ptr") [];
+ if (valid != 0) {
+ /* Transaction was valid: commit succeeded */
+ StgTSO_trec(CurrentTSO) = NO_TREC;
+ Sp = Sp + SIZEOF_StgAtomicallyFrame;
+ IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;)
+ jump %ENTRY_CODE(Sp(SP_OFF));
+ } else {
+ /* Transaction was not valid: try again */
+ "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr") [];
+ StgTSO_trec(CurrentTSO) = trec;
+ StgAtomicallyFrame_next_invariant_to_check(frame) = END_INVARIANT_CHECK_QUEUE;
+ R1 = StgAtomicallyFrame_code(frame);
+ jump stg_ap_v_fast;
+ }
}
}
@@ -1127,13 +1161,29 @@ INFO_TABLE_RET(stg_atomically_waiting_frame,
// STM catch frame --------------------------------------------------------------
-#define CATCH_STM_FRAME_ENTRY_TEMPLATE(label,ret) \
- label \
- { \
- IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); ) \
- Sp = Sp + SIZEOF_StgCatchSTMFrame; \
- IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \
- jump ret; \
+#define CATCH_STM_FRAME_ENTRY_TEMPLATE(label,ret) \
+ label \
+ { \
+ IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); ) \
+ W_ r, frame, trec, outer; \
+ frame = Sp; \
+ trec = StgTSO_trec(CurrentTSO); \
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") []; \
+ r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr") []; \
+ if (r != 0) { \
+ /* Commit succeeded */ \
+ StgTSO_trec(CurrentTSO) = outer; \
+ Sp = Sp + SIZEOF_StgCatchSTMFrame; \
+ IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \
+ jump ret; \
+ } else { \
+ /* Commit failed */ \
+ W_ new_trec; \
+ "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") []; \
+ StgTSO_trec(CurrentTSO) = new_trec; \
+ R1 = StgCatchSTMFrame_code(frame); \
+ jump stg_ap_v_fast; \
+ } \
}
#ifdef REG_R1
@@ -1157,10 +1207,10 @@ CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_7_ret,%RET_VEC(Sp(SP_OFF),7))
#if defined(PROFILING)
#define CATCH_STM_FRAME_BITMAP 3
-#define CATCH_STM_FRAME_WORDS 3
+#define CATCH_STM_FRAME_WORDS 4
#else
#define CATCH_STM_FRAME_BITMAP 0
-#define CATCH_STM_FRAME_WORDS 1
+#define CATCH_STM_FRAME_WORDS 2
#endif
/* Catch frames are very similar to update frames, but when entering
@@ -1210,6 +1260,7 @@ atomicallyzh_fast
SET_HDR(frame,stg_atomically_frame_info, W_[CCCS]);
StgAtomicallyFrame_code(frame) = R1;
+ StgAtomicallyFrame_next_invariant_to_check(frame) = END_INVARIANT_CHECK_QUEUE;
/* Start the memory transcation */
"ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", old_trec "ptr") [R1];
@@ -1234,6 +1285,14 @@ catchSTMzh_fast
SET_HDR(frame, stg_catch_stm_frame_info, W_[CCCS]);
StgCatchSTMFrame_handler(frame) = R2;
+ StgCatchSTMFrame_code(frame) = R1;
+
+ /* Start a nested transaction to run the body of the try block in */
+ W_ cur_trec;
+ W_ new_trec;
+ cur_trec = StgTSO_trec(CurrentTSO);
+ "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", cur_trec "ptr");
+ StgTSO_trec(CurrentTSO) = new_trec;
/* Apply R1 to the realworld token */
jump stg_ap_v_fast;
@@ -1266,7 +1325,6 @@ catchRetryzh_fast
StgCatchRetryFrame_running_alt_code(frame) = 0 :: CInt; // false;
StgCatchRetryFrame_first_code(frame) = R1;
StgCatchRetryFrame_alt_code(frame) = R2;
- StgCatchRetryFrame_first_code_trec(frame) = new_trec;
/* Apply R1 to the realworld token */
jump stg_ap_v_fast;
@@ -1285,54 +1343,48 @@ retryzh_fast
// Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME
retry_pop_stack:
- trec = StgTSO_trec(CurrentTSO);
- "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
StgTSO_sp(CurrentTSO) = Sp;
frame_type = foreign "C" findRetryFrameHelper(CurrentTSO "ptr") [];
Sp = StgTSO_sp(CurrentTSO);
frame = Sp;
+ trec = StgTSO_trec(CurrentTSO);
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
if (frame_type == CATCH_RETRY_FRAME) {
// The retry reaches a CATCH_RETRY_FRAME before the atomic frame
ASSERT(outer != NO_TREC);
+ // Abort the transaction attempting the current branch
+ foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr") [];
+ foreign "C" stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr") [];
if (!StgCatchRetryFrame_running_alt_code(frame) != 0::I32) {
- // Retry in the first code: try the alternative
+ // Retry in the first branch: try the alternative
"ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") [];
StgTSO_trec(CurrentTSO) = trec;
StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true;
R1 = StgCatchRetryFrame_alt_code(frame);
jump stg_ap_v_fast;
} else {
- // Retry in the alternative code: propagate
- W_ other_trec;
- other_trec = StgCatchRetryFrame_first_code_trec(frame);
- r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr") [];
- if (r != 0) {
- r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr") [];
- } else {
- foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr") [];
- }
- if (r != 0) {
- // Merge between siblings succeeded: commit it back to enclosing transaction
- // and then propagate the retry
- StgTSO_trec(CurrentTSO) = outer;
- Sp = Sp + SIZEOF_StgCatchRetryFrame;
- goto retry_pop_stack;
- } else {
- // Merge failed: we musn't propagate the retry. Try both paths again.
- "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") [];
- StgCatchRetryFrame_first_code_trec(frame) = trec;
- StgCatchRetryFrame_running_alt_code(frame) = 0 :: CInt; // false;
- StgTSO_trec(CurrentTSO) = trec;
- R1 = StgCatchRetryFrame_first_code(frame);
- jump stg_ap_v_fast;
- }
+ // Retry in the alternative code: propagate the retry
+ StgTSO_trec(CurrentTSO) = outer;
+ Sp = Sp + SIZEOF_StgCatchRetryFrame;
+ goto retry_pop_stack;
}
}
// We've reached the ATOMICALLY_FRAME: attempt to wait
ASSERT(frame_type == ATOMICALLY_FRAME);
+ if (outer != NO_TREC) {
+ // We called retry while checking invariants, so abort the current
+ // invariant check (merging its TVar accesses into the parents read
+ // set so we'll wait on them)
+ foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr") [];
+ foreign "C" stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr") [];
+ trec = outer;
+ StgTSO_trec(CurrentTSO) = trec;
+ "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") [];
+ }
ASSERT(outer == NO_TREC);
+
r = foreign "C" stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr") [];
if (r != 0) {
// Transaction was valid: stmWait put us on the TVars' queues, we now block
@@ -1355,6 +1407,23 @@ retry_pop_stack:
}
+checkzh_fast
+{
+ W_ trec, closure;
+
+ /* Args: R1 = invariant closure */
+ MAYBE_GC (R1_PTR, checkzh_fast);
+
+ trec = StgTSO_trec(CurrentTSO);
+ closure = R1;
+ foreign "C" stmAddInvariantToCheck(MyCapability() "ptr",
+ trec "ptr",
+ closure "ptr") [];
+
+ jump %ENTRY_CODE(Sp(0));
+}
+
+
newTVarzh_fast
{
W_ tv;
diff --git a/rts/Printer.c b/rts/Printer.c
index 05a9660f4a..671d76fbf8 100644
--- a/rts/Printer.c
+++ b/rts/Printer.c
@@ -714,7 +714,9 @@ static char *closure_type_names[] = {
"RBH",
"EVACUATED",
"REMOTE_REF",
- "TVAR_WAIT_QUEUE",
+ "TVAR_WATCH_QUEUE",
+ "INVARIANT_CHECK_QUEUE",
+ "ATOMIC_INVARIANT",
"TVAR",
"TREC_CHUNK",
"TREC_HEADER",
diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c
index c161d0c6a7..2818fb6946 100644
--- a/rts/ProfHeap.c
+++ b/rts/ProfHeap.c
@@ -987,9 +987,19 @@ heapCensusChain( Census *census, bdescr *bd )
size = sizeofW(StgTRecHeader);
break;
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
prim = rtsTrue;
- size = sizeofW(StgTVarWaitQueue);
+ size = sizeofW(StgTVarWatchQueue);
+ break;
+
+ case INVARIANT_CHECK_QUEUE:
+ prim = rtsTrue;
+ size = sizeofW(StgInvariantCheckQueue);
+ break;
+
+ case ATOMIC_INVARIANT:
+ prim = rtsTrue;
+ size = sizeofW(StgAtomicInvariant);
break;
case TVAR:
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index 3b97b9c6dd..0f84ae5360 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -1020,6 +1020,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = stmGetEnclosingTRec(trec);
stmAbortTransaction(cap, trec);
+ stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
break;
diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
index a18a194c02..e63fb54978 100644
--- a/rts/RetainerProfile.c
+++ b/rts/RetainerProfile.c
@@ -591,8 +591,8 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
return; // no child
break;
- case TVAR_WAIT_QUEUE:
- *first_child = (StgClosure *)((StgTVarWaitQueue *)c)->waiting_tso;
+ case TVAR_WATCH_QUEUE:
+ *first_child = (StgClosure *)((StgTVarWatchQueue *)c)->closure;
se.info.next.step = 2; // 2 = second
break;
case TVAR:
@@ -830,13 +830,13 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
*r = se->c_child_r;
return;
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
if (se->info.next.step == 2) {
- *c = (StgClosure *)((StgTVarWaitQueue *)se->c)->next_queue_entry;
+ *c = (StgClosure *)((StgTVarWatchQueue *)se->c)->next_queue_entry;
se->info.next.step++; // move to the next step
// no popOff
} else {
- *c = (StgClosure *)((StgTVarWaitQueue *)se->c)->prev_queue_entry;
+ *c = (StgClosure *)((StgTVarWatchQueue *)se->c)->prev_queue_entry;
popOff();
}
*cp = se->c;
@@ -844,7 +844,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
return;
case TVAR:
- *c = (StgClosure *)((StgTVar *)se->c)->first_wait_queue_entry;
+ *c = (StgClosure *)((StgTVar *)se->c)->first_watch_queue_entry;
*cp = se->c;
*r = se->c_child_r;
popOff();
@@ -1125,7 +1125,7 @@ isRetainer( StgClosure *c )
case BCO:
case ARR_WORDS:
// STM
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
case TREC_HEADER:
case TREC_CHUNK:
return rtsFalse;
diff --git a/rts/STM.c b/rts/STM.c
index 01155b1107..6bf20f96f2 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -74,7 +74,7 @@
* (d) release the locks on the TVars, writing updates to them in the case of a
* commit, (e) unlock the STM.
*
- * Queues of waiting threads hang off the first_wait_queue_entry field of each
+ * Queues of waiting threads hang off the first_watch_queue_entry field of each
* TVar. This may only be manipulated when holding that TVar's lock. In
* particular, when a thread is putting itself to sleep, it mustn't release
* the TVar's lock until it has added itself to the wait queue and marked its
@@ -146,7 +146,7 @@ static int shake(void) {
StgTRecHeader *__t = (_t); \
StgTRecChunk *__c = __t -> current_chunk; \
StgWord __limit = __c -> next_entry_idx; \
- TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
+ TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit); \
while (__c != END_STM_CHUNK_LIST) { \
StgWord __i; \
for (__i = 0; __i < __limit; __i ++) { \
@@ -178,20 +178,20 @@ static int shake(void) {
#if defined(STM_UNIPROC)
#undef IF_STM_UNIPROC
#define IF_STM_UNIPROC(__X) do { __X } while (0)
-static const StgBool use_read_phase = FALSE;
+static const StgBool config_use_read_phase = FALSE;
static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
- TRACE("%p : lock_stm()\n", trec);
+ TRACE("%p : lock_stm()", trec);
}
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
- TRACE("%p : unlock_stm()\n", trec);
+ TRACE("%p : unlock_stm()", trec);
}
static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED) {
StgClosure *result;
- TRACE("%p : lock_tvar(%p)\n", trec, s);
+ TRACE("%p : lock_tvar(%p)", trec, s);
result = s -> current_value;
return result;
}
@@ -200,7 +200,7 @@ static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED,
StgClosure *c,
StgBool force_update) {
- TRACE("%p : unlock_tvar(%p)\n", trec, s);
+ TRACE("%p : unlock_tvar(%p)", trec, s);
if (force_update) {
s -> current_value = c;
}
@@ -210,27 +210,36 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
- TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
+ TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
result = s -> current_value;
- TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
+ TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
return (result == expected);
}
+
+static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
+ // Nothing -- uniproc
+ return TRUE;
+}
+
+static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
+ // Nothing -- uniproc
+}
#endif
#if defined(STM_CG_LOCK) /*........................................*/
#undef IF_STM_CG_LOCK
#define IF_STM_CG_LOCK(__X) do { __X } while (0)
-static const StgBool use_read_phase = FALSE;
+static const StgBool config_use_read_phase = FALSE;
static volatile StgTRecHeader *smp_locked = NULL;
static void lock_stm(StgTRecHeader *trec) {
while (cas(&smp_locked, NULL, trec) != NULL) { }
- TRACE("%p : lock_stm()\n", trec);
+ TRACE("%p : lock_stm()", trec);
}
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
- TRACE("%p : unlock_stm()\n", trec);
+ TRACE("%p : unlock_stm()", trec);
ASSERT (smp_locked == trec);
smp_locked = 0;
}
@@ -238,7 +247,7 @@ static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED) {
StgClosure *result;
- TRACE("%p : lock_tvar(%p)\n", trec, s);
+ TRACE("%p : lock_tvar(%p)", trec, s);
ASSERT (smp_locked == trec);
result = s -> current_value;
return result;
@@ -248,7 +257,7 @@ static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED,
StgClosure *c,
StgBool force_update) {
- TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
+ TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
ASSERT (smp_locked == trec);
if (force_update) {
s -> current_value = c;
@@ -259,32 +268,41 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
- TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
+ TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
ASSERT (smp_locked == trec);
result = s -> current_value;
- TRACE("%p : %d\n", result ? "success" : "failure");
+ TRACE("%p : %d", result ? "success" : "failure");
return (result == expected);
}
+
+static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
+ // Nothing -- protected by STM lock
+ return TRUE;
+}
+
+static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) {
+ // Nothing -- protected by STM lock
+}
#endif
#if defined(STM_FG_LOCKS) /*...................................*/
#undef IF_STM_FG_LOCKS
#define IF_STM_FG_LOCKS(__X) do { __X } while (0)
-static const StgBool use_read_phase = TRUE;
+static const StgBool config_use_read_phase = TRUE;
static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
- TRACE("%p : lock_stm()\n", trec);
+ TRACE("%p : lock_stm()", trec);
}
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
- TRACE("%p : unlock_stm()\n", trec);
+ TRACE("%p : unlock_stm()", trec);
}
static StgClosure *lock_tvar(StgTRecHeader *trec,
StgTVar *s STG_UNUSED) {
StgClosure *result;
- TRACE("%p : lock_tvar(%p)\n", trec, s);
+ TRACE("%p : lock_tvar(%p)", trec, s);
do {
do {
result = s -> current_value;
@@ -298,7 +316,7 @@ static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
StgTVar *s,
StgClosure *c,
StgBool force_update STG_UNUSED) {
- TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
+ TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
ASSERT(s -> current_value == (StgClosure *)trec);
s -> current_value = c;
}
@@ -308,23 +326,46 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec,
StgClosure *expected) {
StgClosure *result;
StgWord w;
- TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
+ TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
result = (StgClosure *)w;
- TRACE("%p : %s\n", trec, result ? "success" : "failure");
+ TRACE("%p : %s", trec, result ? "success" : "failure");
return (result == expected);
}
+
+static StgBool lock_inv(StgAtomicInvariant *inv) {
+ return (cas(&(inv -> lock), 0, 1) == 0);
+}
+
+static void unlock_inv(StgAtomicInvariant *inv) {
+ ASSERT(inv -> lock == 1);
+ inv -> lock = 0;
+}
#endif
/*......................................................................*/
+
+static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
+ StgClosure *c = q -> closure;
+ StgInfoTable *info = get_itbl(c);
+ return (info -> type) == TSO;
+}
+
+static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
+ StgClosure *c = q -> closure;
+ StgInfoTable *info = get_itbl(c);
+ return (info -> type) == ATOMIC_INVARIANT;
+}
+/*......................................................................*/
+
// Helper functions for thread blocking and unblocking
static void park_tso(StgTSO *tso) {
ASSERT(tso -> why_blocked == NotBlocked);
tso -> why_blocked = BlockedOnSTM;
tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
- TRACE("park_tso on tso=%p\n", tso);
+ TRACE("park_tso on tso=%p", tso);
}
static void unpark_tso(Capability *cap, StgTSO *tso) {
@@ -337,21 +378,23 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
// synchronise with throwTo().
lockTSO(tso);
if (tso -> why_blocked == BlockedOnSTM) {
- TRACE("unpark_tso on tso=%p\n", tso);
+ TRACE("unpark_tso on tso=%p", tso);
unblockOne(cap,tso);
} else {
- TRACE("spurious unpark_tso on tso=%p\n", tso);
+ TRACE("spurious unpark_tso on tso=%p", tso);
}
unlockTSO(tso);
}
static void unpark_waiters_on(Capability *cap, StgTVar *s) {
- StgTVarWaitQueue *q;
- TRACE("unpark_waiters_on tvar=%p\n", s);
- for (q = s -> first_wait_queue_entry;
- q != END_STM_WAIT_QUEUE;
+ StgTVarWatchQueue *q;
+ TRACE("unpark_waiters_on tvar=%p", s);
+ for (q = s -> first_watch_queue_entry;
+ q != END_STM_WATCH_QUEUE;
q = q -> next_queue_entry) {
- unpark_tso(cap, q -> waiting_tso);
+ if (watcher_is_tso(q)) {
+ unpark_tso(cap, (StgTSO *)(q -> closure));
+ }
}
}
@@ -359,12 +402,22 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
// Helper functions for downstream allocation and initialization
-static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
- StgTSO *waiting_tso) {
- StgTVarWaitQueue *result;
- result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
- SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
- result -> waiting_tso = waiting_tso;
+static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
+ StgAtomicInvariant *invariant) {
+ StgInvariantCheckQueue *result;
+ result = (StgInvariantCheckQueue *)allocateLocal(cap, sizeofW(StgInvariantCheckQueue));
+ SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
+ result -> invariant = invariant;
+ result -> my_execution = NO_TREC;
+ return result;
+}
+
+static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
+ StgClosure *closure) {
+ StgTVarWatchQueue *result;
+ result = (StgTVarWatchQueue *)allocateLocal(cap, sizeofW(StgTVarWatchQueue));
+ SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
+ result -> closure = closure;
return result;
}
@@ -385,6 +438,7 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap,
result -> enclosing_trec = enclosing_trec;
result -> current_chunk = new_stg_trec_chunk(cap);
+ result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
if (enclosing_trec == NO_TREC) {
result -> state = TREC_ACTIVE;
@@ -402,24 +456,38 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap,
// Allocation / deallocation functions that retain per-capability lists
// of closures that can be re-used
-static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
- StgTSO *waiting_tso) {
- StgTVarWaitQueue *result = NULL;
- if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
- result = new_stg_tvar_wait_queue(cap, waiting_tso);
+static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
+ StgAtomicInvariant *invariant) {
+ StgInvariantCheckQueue *result = NULL;
+ if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
+ result = new_stg_invariant_check_queue(cap, invariant);
+ } else {
+ result = cap -> free_invariant_check_queues;
+ result -> invariant = invariant;
+ result -> my_execution = NO_TREC;
+ cap -> free_invariant_check_queues = result -> next_queue_entry;
+ }
+ return result;
+}
+
+static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
+ StgClosure *closure) {
+ StgTVarWatchQueue *result = NULL;
+ if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
+ result = new_stg_tvar_watch_queue(cap, closure);
} else {
- result = cap -> free_tvar_wait_queues;
- result -> waiting_tso = waiting_tso;
- cap -> free_tvar_wait_queues = result -> next_queue_entry;
+ result = cap -> free_tvar_watch_queues;
+ result -> closure = closure;
+ cap -> free_tvar_watch_queues = result -> next_queue_entry;
}
return result;
}
-static void free_stg_tvar_wait_queue(Capability *cap,
- StgTVarWaitQueue *wq) {
+static void free_stg_tvar_watch_queue(Capability *cap,
+ StgTVarWatchQueue *wq) {
#if defined(REUSE_MEMORY)
- wq -> next_queue_entry = cap -> free_tvar_wait_queues;
- cap -> free_tvar_wait_queues = wq;
+ wq -> next_queue_entry = cap -> free_tvar_watch_queues;
+ cap -> free_tvar_watch_queues = wq;
#endif
}
@@ -454,6 +522,7 @@ static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
cap -> free_trec_headers = result -> enclosing_trec;
result -> enclosing_trec = enclosing_trec;
result -> current_chunk -> next_entry_idx = 0;
+ result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
if (enclosing_trec == NO_TREC) {
result -> state = TREC_ACTIVE;
} else {
@@ -484,66 +553,69 @@ static void free_stg_trec_header(Capability *cap,
// Helper functions for managing waiting lists
-static void build_wait_queue_entries_for_trec(Capability *cap,
- StgTSO *tso,
- StgTRecHeader *trec) {
+static void build_watch_queue_entries_for_trec(Capability *cap,
+ StgTSO *tso,
+ StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT(trec -> state == TREC_ACTIVE);
- TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
+ TRACE("%p : build_watch_queue_entries_for_trec()", trec);
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
- StgTVarWaitQueue *q;
- StgTVarWaitQueue *fq;
+ StgTVarWatchQueue *q;
+ StgTVarWatchQueue *fq;
s = e -> tvar;
- TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
+ TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
NACQ_ASSERT(s -> current_value == e -> expected_value);
- fq = s -> first_wait_queue_entry;
- q = alloc_stg_tvar_wait_queue(cap, tso);
+ fq = s -> first_watch_queue_entry;
+ q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
q -> next_queue_entry = fq;
- q -> prev_queue_entry = END_STM_WAIT_QUEUE;
- if (fq != END_STM_WAIT_QUEUE) {
+ q -> prev_queue_entry = END_STM_WATCH_QUEUE;
+ if (fq != END_STM_WATCH_QUEUE) {
fq -> prev_queue_entry = q;
}
- s -> first_wait_queue_entry = q;
+ s -> first_watch_queue_entry = q;
e -> new_value = (StgClosure *) q;
});
}
-static void remove_wait_queue_entries_for_trec(Capability *cap,
- StgTRecHeader *trec) {
+static void remove_watch_queue_entries_for_trec(Capability *cap,
+ StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT(trec -> state == TREC_WAITING ||
trec -> state == TREC_CONDEMNED);
- TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
+ TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
- StgTVarWaitQueue *pq;
- StgTVarWaitQueue *nq;
- StgTVarWaitQueue *q;
+ StgTVarWatchQueue *pq;
+ StgTVarWatchQueue *nq;
+ StgTVarWatchQueue *q;
s = e -> tvar;
StgClosure *saw = lock_tvar(trec, s);
- q = (StgTVarWaitQueue *) (e -> new_value);
- TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
+ q = (StgTVarWatchQueue *) (e -> new_value);
+ TRACE("%p : removing tso=%p from watch queue for tvar=%p",
+ trec,
+ q -> closure,
+ s);
ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
nq = q -> next_queue_entry;
pq = q -> prev_queue_entry;
- if (nq != END_STM_WAIT_QUEUE) {
+ if (nq != END_STM_WATCH_QUEUE) {
nq -> prev_queue_entry = pq;
}
- if (pq != END_STM_WAIT_QUEUE) {
+ if (pq != END_STM_WATCH_QUEUE) {
pq -> next_queue_entry = nq;
} else {
- ASSERT (s -> first_wait_queue_entry == q);
- s -> first_wait_queue_entry = nq;
+ ASSERT (s -> first_watch_queue_entry == q);
+ s -> first_watch_queue_entry = nq;
}
- free_stg_tvar_wait_queue(cap, q);
+ free_stg_tvar_watch_queue(cap, q);
unlock_tvar(trec, s, saw, FALSE);
});
}
@@ -595,7 +667,7 @@ static void merge_update_into(Capability *cap,
found = TRUE;
if (e -> expected_value != expected_value) {
// Must abort if the two entries start from different values
- TRACE("%p : entries inconsistent at %p (%p vs %p)\n",
+ TRACE("%p : update entries inconsistent at %p (%p vs %p)",
t, tvar, e -> expected_value, expected_value);
t -> state = TREC_CONDEMNED;
}
@@ -616,6 +688,41 @@ static void merge_update_into(Capability *cap,
/*......................................................................*/
+static void merge_read_into(Capability *cap,
+ StgTRecHeader *t,
+ StgTVar *tvar,
+ StgClosure *expected_value) {
+ int found;
+
+ // Look for an entry in this trec
+ found = FALSE;
+ FOR_EACH_ENTRY(t, e, {
+ StgTVar *s;
+ s = e -> tvar;
+ if (s == tvar) {
+ found = TRUE;
+ if (e -> expected_value != expected_value) {
+ // Must abort if the two entries start from different values
+ TRACE("%p : read entries inconsistent at %p (%p vs %p)",
+ t, tvar, e -> expected_value, expected_value);
+ t -> state = TREC_CONDEMNED;
+ }
+ BREAK_FOR_EACH;
+ }
+ });
+
+ if (!found) {
+ // No entry so far in this trec
+ TRecEntry *ne;
+ ne = get_new_entry(cap, t);
+ ne -> tvar = tvar;
+ ne -> expected_value = expected_value;
+ ne -> new_value = expected_value;
+ }
+}
+
+/*......................................................................*/
+
static StgBool entry_is_update(TRecEntry *e) {
StgBool result;
result = (e -> expected_value != e -> new_value);
@@ -680,7 +787,7 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
StgBool result;
if (shake()) {
- TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
+ TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
return FALSE;
}
@@ -693,28 +800,28 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
StgTVar *s;
s = e -> tvar;
if (acquire_all || entry_is_update(e)) {
- TRACE("%p : trying to acquire %p\n", trec, s);
+ TRACE("%p : trying to acquire %p", trec, s);
if (!cond_lock_tvar(trec, s, e -> expected_value)) {
- TRACE("%p : failed to acquire %p\n", trec, s);
+ TRACE("%p : failed to acquire %p", trec, s);
result = FALSE;
BREAK_FOR_EACH;
}
} else {
- ASSERT(use_read_phase);
+ ASSERT(config_use_read_phase);
IF_STM_FG_LOCKS({
- TRACE("%p : will need to check %p\n", trec, s);
+ TRACE("%p : will need to check %p", trec, s);
if (s -> current_value != e -> expected_value) {
- TRACE("%p : doesn't match\n", trec);
+ TRACE("%p : doesn't match", trec);
result = FALSE;
BREAK_FOR_EACH;
}
e -> num_updates = s -> num_updates;
if (s -> current_value != e -> expected_value) {
- TRACE("%p : doesn't match (race)\n", trec);
+ TRACE("%p : doesn't match (race)", trec);
result = FALSE;
BREAK_FOR_EACH;
} else {
- TRACE("%p : need to check version %ld\n", trec, e -> num_updates);
+ TRACE("%p : need to check version %ld", trec, e -> num_updates);
}
});
}
@@ -742,7 +849,7 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
StgBool result = TRUE;
- ASSERT (use_read_phase);
+ ASSERT (config_use_read_phase);
IF_STM_FG_LOCKS({
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
@@ -751,7 +858,7 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
if (s -> num_updates != e -> num_updates) {
// ||s -> current_value != e -> expected_value) {
- TRACE("%p : mismatch\n", trec);
+ TRACE("%p : mismatch", trec);
result = FALSE;
BREAK_FOR_EACH;
}
@@ -769,10 +876,10 @@ void stmPreGCHook() {
nat i;
lock_stm(NO_TREC);
- TRACE("stmPreGCHook\n");
+ TRACE("stmPreGCHook");
for (i = 0; i < n_capabilities; i ++) {
Capability *cap = &capabilities[i];
- cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap -> free_tvar_watch_queues = END_STM_WATCH_QUEUE;
cap -> free_trec_chunks = END_STM_CHUNK_LIST;
cap -> free_trec_headers = NO_TREC;
}
@@ -799,6 +906,7 @@ static volatile StgBool token_locked = FALSE;
static void getTokenBatch(Capability *cap) {
while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
max_commits += TOKEN_BATCH_SIZE;
+ TRACE("%p : cap got token batch, max_commits=%lld", cap, max_commits);
cap -> transaction_tokens = TOKEN_BATCH_SIZE;
token_locked = FALSE;
}
@@ -820,14 +928,14 @@ static void getToken(Capability *cap STG_UNUSED) {
StgTRecHeader *stmStartTransaction(Capability *cap,
StgTRecHeader *outer) {
StgTRecHeader *t;
- TRACE("%p : stmStartTransaction with %d tokens\n",
+ TRACE("%p : stmStartTransaction with %d tokens",
outer,
cap -> transaction_tokens);
getToken(cap);
t = alloc_stg_trec_header(cap, outer);
- TRACE("%p : stmStartTransaction()=%p\n", outer, t);
+ TRACE("%p : stmStartTransaction()=%p", outer, t);
return t;
}
@@ -835,31 +943,61 @@ StgTRecHeader *stmStartTransaction(Capability *cap,
void stmAbortTransaction(Capability *cap,
StgTRecHeader *trec) {
- TRACE("%p : stmAbortTransaction\n", trec);
+ TRACE("%p : stmAbortTransaction", trec);
ASSERT (trec != NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_WAITING) ||
(trec -> state == TREC_CONDEMNED));
lock_stm(trec);
- if (trec -> state == TREC_WAITING) {
- ASSERT (trec -> enclosing_trec == NO_TREC);
- TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
- remove_wait_queue_entries_for_trec(cap, trec);
+
+ StgTRecHeader *et = trec -> enclosing_trec;
+ if (et == NO_TREC) {
+ // We're a top-level transaction: remove any watch queue entries that
+ // we may have.
+ TRACE("%p : aborting top-level transaction", trec);
+
+ if (trec -> state == TREC_WAITING) {
+ ASSERT (trec -> enclosing_trec == NO_TREC);
+ TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
+ remove_watch_queue_entries_for_trec(cap, trec);
+ }
+
+ } else {
+ // We're a nested transaction: merge our read set into our parent's
+ TRACE("%p : retaining read-set into parent %p", trec, et);
+
+ FOR_EACH_ENTRY(trec, e, {
+ StgTVar *s = e -> tvar;
+ merge_read_into(cap, et, s, e -> expected_value);
+ });
}
+
trec -> state = TREC_ABORTED;
unlock_stm(trec);
+ TRACE("%p : stmAbortTransaction done", trec);
+}
+
+/*......................................................................*/
+
+void stmFreeAbortedTRec(Capability *cap,
+ StgTRecHeader *trec) {
+ TRACE("%p : stmFreeAbortedTRec", trec);
+ ASSERT (trec != NO_TREC);
+ ASSERT ((trec -> state == TREC_CONDEMNED) ||
+ (trec -> state == TREC_ABORTED));
+
free_stg_trec_header(cap, trec);
- TRACE("%p : stmAbortTransaction done\n", trec);
+ TRACE("%p : stmFreeAbortedTRec done", trec);
}
/*......................................................................*/
void stmCondemnTransaction(Capability *cap,
StgTRecHeader *trec) {
- TRACE("%p : stmCondemnTransaction\n", trec);
+ TRACE("%p : stmCondemnTransaction", trec);
ASSERT (trec != NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_WAITING) ||
@@ -868,22 +1006,22 @@ void stmCondemnTransaction(Capability *cap,
lock_stm(trec);
if (trec -> state == TREC_WAITING) {
ASSERT (trec -> enclosing_trec == NO_TREC);
- TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
- remove_wait_queue_entries_for_trec(cap, trec);
+ TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
+ remove_watch_queue_entries_for_trec(cap, trec);
}
trec -> state = TREC_CONDEMNED;
unlock_stm(trec);
- TRACE("%p : stmCondemnTransaction done\n", trec);
+ TRACE("%p : stmCondemnTransaction done", trec);
}
/*......................................................................*/
StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
StgTRecHeader *outer;
- TRACE("%p : stmGetEnclosingTRec\n", trec);
+ TRACE("%p : stmGetEnclosingTRec", trec);
outer = trec -> enclosing_trec;
- TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
+ TRACE("%p : stmGetEnclosingTRec()=%p", trec, outer);
return outer;
}
@@ -893,7 +1031,7 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
StgTRecHeader *t;
StgBool result;
- TRACE("%p : stmValidateNestOfTransactions\n", trec);
+ TRACE("%p : stmValidateNestOfTransactions", trec);
ASSERT(trec != NO_TREC);
ASSERT((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_WAITING) ||
@@ -914,17 +1052,229 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
unlock_stm(trec);
- TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
+ TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
return result;
}
/*......................................................................*/
+static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
+ TRecEntry *result = NULL;
+
+ TRACE("%p : get_entry_for TVar %p", trec, tvar);
+ ASSERT(trec != NO_TREC);
+
+ do {
+ FOR_EACH_ENTRY(trec, e, {
+ if (e -> tvar == tvar) {
+ result = e;
+ if (in != NULL) {
+ *in = trec;
+ }
+ BREAK_FOR_EACH;
+ }
+ });
+ trec = trec -> enclosing_trec;
+ } while (result == NULL && trec != NO_TREC);
+
+ return result;
+}
+
+/*......................................................................*/
+
+/*
+ * Add/remove links between an invariant TVars. The caller must have
+ * locked the TVars involved and the invariant.
+ */
+
+static void disconnect_invariant(Capability *cap,
+ StgAtomicInvariant *inv) {
+ StgTRecHeader *last_execution = inv -> last_execution;
+
+ TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
+
+ FOR_EACH_ENTRY(last_execution, e, {
+ StgTVar *s = e -> tvar;
+ StgTVarWatchQueue *q = s -> first_watch_queue_entry;
+ StgBool found = FALSE;
+ TRACE(" looking for trec on tvar=%p", s);
+ for (q = s -> first_watch_queue_entry;
+ q != END_STM_WATCH_QUEUE;
+ q = q -> next_queue_entry) {
+ if (q -> closure == (StgClosure*)inv) {
+ StgTVarWatchQueue *pq;
+ StgTVarWatchQueue *nq;
+ nq = q -> next_queue_entry;
+ pq = q -> prev_queue_entry;
+ if (nq != END_STM_WATCH_QUEUE) {
+ nq -> prev_queue_entry = pq;
+ }
+ if (pq != END_STM_WATCH_QUEUE) {
+ pq -> next_queue_entry = nq;
+ } else {
+ ASSERT (s -> first_watch_queue_entry == q);
+ s -> first_watch_queue_entry = nq;
+ }
+ TRACE(" found it in watch queue entry %p", q);
+ free_stg_tvar_watch_queue(cap, q);
+ found = TRUE;
+ break;
+ }
+ }
+ ASSERT(found);
+ });
+ inv -> last_execution = NO_TREC;
+}
+
+static void connect_invariant_to_trec(Capability *cap,
+ StgAtomicInvariant *inv,
+ StgTRecHeader *my_execution) {
+ TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
+
+ ASSERT(inv -> last_execution == NO_TREC);
+
+ FOR_EACH_ENTRY(my_execution, e, {
+ StgTVar *s = e -> tvar;
+ StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
+ StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
+
+ // We leave "last_execution" holding the values that will be
+ // in the heap after the transaction we're in the process
+ // of committing has finished.
+ TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
+ if (entry != NULL) {
+ e -> expected_value = entry -> new_value;
+ e -> new_value = entry -> new_value;
+ }
+
+ TRACE(" linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
+ q -> next_queue_entry = fq;
+ q -> prev_queue_entry = END_STM_WATCH_QUEUE;
+ if (fq != END_STM_WATCH_QUEUE) {
+ fq -> prev_queue_entry = q;
+ }
+ s -> first_watch_queue_entry = q;
+ });
+
+ inv -> last_execution = my_execution;
+}
+
+/*
+ * Add a new invariant to the trec's list of invariants to check on commit
+ */
+void stmAddInvariantToCheck(Capability *cap,
+ StgTRecHeader *trec,
+ StgClosure *code) {
+ TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
+ ASSERT(trec != NO_TREC);
+ ASSERT(trec -> state == TREC_ACTIVE ||
+ trec -> state == TREC_CONDEMNED);
+
+ StgAtomicInvariant *invariant;
+ StgInvariantCheckQueue *q;
+
+ // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
+ // to signal that this is a new invariant in the current atomic block
+
+ invariant = (StgAtomicInvariant *) allocateLocal(cap, sizeofW(StgAtomicInvariant));
+ TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
+ SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
+ invariant -> code = code;
+ invariant -> last_execution = NO_TREC;
+
+ // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
+
+ q = alloc_stg_invariant_check_queue(cap, invariant);
+ TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
+ q -> invariant = invariant;
+ q -> my_execution = NO_TREC;
+ q -> next_queue_entry = trec -> invariants_to_check;
+ trec -> invariants_to_check = q;
+
+ TRACE("%p : stmAddInvariantToCheck done", trec);
+}
+
+/*
+ * Fill in the trec's list of invariants that might be violated by the
+ * current transaction.
+ */
+
+StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
+ TRACE("%p : stmGetInvariantsToCheck, head was %p",
+ trec,
+ trec -> invariants_to_check);
+
+ ASSERT(trec != NO_TREC);
+ ASSERT ((trec -> state == TREC_ACTIVE) ||
+ (trec -> state == TREC_WAITING) ||
+ (trec -> state == TREC_CONDEMNED));
+ ASSERT(trec -> enclosing_trec == NO_TREC);
+
+ lock_stm(trec);
+ StgTRecChunk *c = trec -> current_chunk;
+ while (c != END_STM_CHUNK_LIST) {
+ unsigned int i;
+ for (i = 0; i < c -> next_entry_idx; i ++) {
+ TRecEntry *e = &(c -> entries[i]);
+ if (entry_is_update(e)) {
+ StgTVar *s = e -> tvar;
+ StgClosure *old = lock_tvar(trec, s);
+
+ // Pick up any invariants on the TVar being updated
+ // by entry "e"
+
+ TRACE("%p : checking for invariants on %p", trec, s);
+ StgTVarWatchQueue *q;
+ for (q = s -> first_watch_queue_entry;
+ q != END_STM_WATCH_QUEUE;
+ q = q -> next_queue_entry) {
+ if (watcher_is_invariant(q)) {
+ TRACE("%p : Touching invariant %p", trec, q -> closure);
+ StgBool found = FALSE;
+ StgInvariantCheckQueue *q2;
+ for (q2 = trec -> invariants_to_check;
+ q2 != END_INVARIANT_CHECK_QUEUE;
+ q2 = q2 -> next_queue_entry) {
+ if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
+ TRACE("%p : Already found %p", trec, q -> closure);
+ found = TRUE;
+ break;
+ }
+ }
+
+ if (!found) {
+ TRACE("%p : Not already found %p", trec, q -> closure);
+ StgInvariantCheckQueue *q3;
+ q3 = alloc_stg_invariant_check_queue(cap,
+ (StgAtomicInvariant*) q -> closure);
+ q3 -> next_queue_entry = trec -> invariants_to_check;
+ trec -> invariants_to_check = q3;
+ }
+ }
+ }
+
+ unlock_tvar(trec, s, old, FALSE);
+ }
+ }
+ c = c -> prev_chunk;
+ }
+
+ unlock_stm(trec);
+
+ TRACE("%p : stmGetInvariantsToCheck, head now %p",
+ trec,
+ trec -> invariants_to_check);
+
+ return (trec -> invariants_to_check);
+}
+
+/*......................................................................*/
+
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
int result;
StgInt64 max_commits_at_start = max_commits;
- TRACE("%p : stmCommitTransaction()\n", trec);
+ TRACE("%p : stmCommitTransaction()", trec);
ASSERT (trec != NO_TREC);
lock_stm(trec);
@@ -933,15 +1283,70 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
ASSERT ((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_CONDEMNED));
+ // touched_invariants is true if we've written to a TVar with invariants
+ // attached to it, or if we're trying to add a new invariant to the system.
+
+ StgBool touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
+
+ // If we have touched invariants then (i) lock the invariant, and (ii) add
+ // the invariant's read set to our own. Step (i) is needed to serialize
+ // concurrent transactions that attempt to make conflicting updates
+ // to the invariant's trec (suppose it read from t1 and t2, and that one
+ // concurrent transcation writes only to t1, and a second writes only to
+ // t2). Step (ii) is needed so that both transactions will lock t1 and t2
+ // to gain access to their wait lists (and hence be able to unhook the
+ // invariant from both tvars).
+
+ if (touched_invariants) {
+ TRACE("%p : locking invariants", trec);
+ StgInvariantCheckQueue *q = trec -> invariants_to_check;
+ while (q != END_INVARIANT_CHECK_QUEUE) {
+ TRACE("%p : locking invariant %p", trec, q -> invariant);
+ StgAtomicInvariant *inv = q -> invariant;
+ if (!lock_inv(inv)) {
+ TRACE("%p : failed to lock %p", trec, inv);
+ trec -> state = TREC_CONDEMNED;
+ break;
+ }
+
+ StgTRecHeader *inv_old_trec = inv -> last_execution;
+ if (inv_old_trec != NO_TREC) {
+ StgTRecChunk *c = inv_old_trec -> current_chunk;
+ while (c != END_STM_CHUNK_LIST) {
+ unsigned int i;
+ for (i = 0; i < c -> next_entry_idx; i ++) {
+ TRecEntry *e = &(c -> entries[i]);
+ TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
+ merge_read_into (cap, trec, e -> tvar, e -> expected_value);
+ }
+ c = c -> prev_chunk;
+ }
+ }
+ q = q -> next_queue_entry;
+ }
+ TRACE("%p : finished locking invariants", trec);
+ }
+
+ // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
+ // (i) the configuration lets us use a read phase, and (ii) we've not
+ // touched or introduced any invariants.
+ //
+ // In principle we could extend the implementation to support a read-phase
+ // and invariants, but it complicates the logic: the links between
+ // invariants and TVars are managed by the TVar watch queues which are
+ // protected by the TVar's locks.
+
+ StgBool use_read_phase = ((config_use_read_phase) && (!touched_invariants));
+
result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
if (result) {
// We now know that all the updated locations hold their expected values.
ASSERT (trec -> state == TREC_ACTIVE);
if (use_read_phase) {
- TRACE("%p : doing read check\n", trec);
+ TRACE("%p : doing read check", trec);
result = check_read_only(trec);
- TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
+ TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
StgInt64 max_commits_at_end = max_commits;
StgInt64 max_concurrent_commits;
@@ -956,16 +1361,38 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
// We now know that all of the read-only locations held their exepcted values
// at the end of the call to validate_and_acquire_ownership. This forms the
// linearization point of the commit.
-
+
+ // 1. If we have touched or introduced any invariants then unhook them
+ // from the TVars they depended on last time they were executed
+ // and hook them on the TVars that they now depend on.
+ if (touched_invariants) {
+ StgInvariantCheckQueue *q = trec -> invariants_to_check;
+ while (q != END_INVARIANT_CHECK_QUEUE) {
+ StgAtomicInvariant *inv = q -> invariant;
+ if (inv -> last_execution != NO_TREC) {
+ disconnect_invariant(cap, inv);
+ }
+
+ TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
+ connect_invariant_to_trec(cap, inv, q -> my_execution);
+
+ TRACE("%p : unlocking invariant %p", trec, inv);
+ unlock_inv(inv);
+
+ q = q -> next_queue_entry;
+ }
+ }
+
+ // 2. Make the updates required by the transaction
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
s = e -> tvar;
- if (e -> new_value != e -> expected_value) {
- // Entry is an update: write the value back to the TVar, unlocking it if
- // necessary.
+ if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
+ // Either the entry is an update or we're not using a read phase:
+ // write the value back to the TVar, unlocking it if necessary.
ACQ_ASSERT(tvar_is_locked(s, trec));
- TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
+ TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
s -> num_updates ++;
@@ -983,7 +1410,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
free_stg_trec_header(cap, trec);
- TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
+ TRACE("%p : stmCommitTransaction()=%d", trec, result);
return result;
}
@@ -994,18 +1421,18 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
StgTRecHeader *et;
int result;
ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
- TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
+ TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
lock_stm(trec);
et = trec -> enclosing_trec;
- result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
+ result = validate_and_acquire_ownership(trec, (!config_use_read_phase), TRUE);
if (result) {
// We now know that all the updated locations hold their expected values.
- if (use_read_phase) {
- TRACE("%p : doing read check\n", trec);
+ if (config_use_read_phase) {
+ TRACE("%p : doing read check", trec);
result = check_read_only(trec);
}
if (result) {
@@ -1013,23 +1440,21 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
// at the end of the call to validate_and_acquire_ownership. This forms the
// linearization point of the commit.
- if (result) {
- TRACE("%p : read-check succeeded\n", trec);
- FOR_EACH_ENTRY(trec, e, {
- // Merge each entry into the enclosing transaction record, release all
- // locks.
-
- StgTVar *s;
- s = e -> tvar;
- if (entry_is_update(e)) {
- unlock_tvar(trec, s, e -> expected_value, FALSE);
- }
- merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
- ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
- });
- } else {
- revert_ownership(trec, FALSE);
- }
+ TRACE("%p : read-check succeeded", trec);
+ FOR_EACH_ENTRY(trec, e, {
+ // Merge each entry into the enclosing transaction record, release all
+ // locks.
+
+ StgTVar *s;
+ s = e -> tvar;
+ if (entry_is_update(e)) {
+ unlock_tvar(trec, s, e -> expected_value, FALSE);
+ }
+ merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
+ ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
+ });
+ } else {
+ revert_ownership(trec, FALSE);
}
}
@@ -1037,7 +1462,7 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
free_stg_trec_header(cap, trec);
- TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
+ TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
return result;
}
@@ -1046,7 +1471,7 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
int result;
- TRACE("%p : stmWait(%p)\n", trec, tso);
+ TRACE("%p : stmWait(%p)", trec, tso);
ASSERT (trec != NO_TREC);
ASSERT (trec -> enclosing_trec == NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
@@ -1062,7 +1487,7 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
// Put ourselves to sleep. We retain locks on all the TVars involved
// until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
// in the TSO, (c) TREC_WAITING in the Trec.
- build_wait_queue_entries_for_trec(cap, tso, trec);
+ build_watch_queue_entries_for_trec(cap, tso, trec);
park_tso(tso);
trec -> state = TREC_WAITING;
@@ -1079,7 +1504,7 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
free_stg_trec_header(cap, trec);
}
- TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
+ TRACE("%p : stmWait(%p)=%d", trec, tso, result);
return result;
}
@@ -1096,7 +1521,7 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
int result;
StgTRecHeader *trec = tso->trec;
- TRACE("%p : stmReWait\n", trec);
+ TRACE("%p : stmReWait", trec);
ASSERT (trec != NO_TREC);
ASSERT (trec -> enclosing_trec == NO_TREC);
ASSERT ((trec -> state == TREC_WAITING) ||
@@ -1104,7 +1529,7 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
lock_stm(trec);
result = validate_and_acquire_ownership(trec, TRUE, TRUE);
- TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
+ TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
if (result) {
// The transaction remains valid -- do nothing because it is already on
// the wait queues
@@ -1115,52 +1540,30 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
// The transcation has become invalid. We can now remove it from the wait
// queues.
if (trec -> state != TREC_CONDEMNED) {
- remove_wait_queue_entries_for_trec (cap, trec);
+ remove_watch_queue_entries_for_trec (cap, trec);
}
free_stg_trec_header(cap, trec);
}
unlock_stm(trec);
- TRACE("%p : stmReWait()=%d\n", trec, result);
+ TRACE("%p : stmReWait()=%d", trec, result);
return result;
}
/*......................................................................*/
-static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
- TRecEntry *result = NULL;
-
- TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
- ASSERT(trec != NO_TREC);
-
- do {
- FOR_EACH_ENTRY(trec, e, {
- if (e -> tvar == tvar) {
- result = e;
- if (in != NULL) {
- *in = trec;
- }
- BREAK_FOR_EACH;
- }
- });
- trec = trec -> enclosing_trec;
- } while (result == NULL && trec != NO_TREC);
-
- return result;
-}
-
static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
StgClosure *result;
result = tvar -> current_value;
#if defined(STM_FG_LOCKS)
while (GET_INFO(result) == &stg_TREC_HEADER_info) {
- TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
+ TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
result = tvar -> current_value;
}
#endif
- TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
+ TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
return result;
}
@@ -1172,7 +1575,7 @@ StgClosure *stmReadTVar(Capability *cap,
StgTRecHeader *entry_in;
StgClosure *result = NULL;
TRecEntry *entry = NULL;
- TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
+ TRACE("%p : stmReadTVar(%p)", trec, tvar);
ASSERT (trec != NO_TREC);
ASSERT (trec -> state == TREC_ACTIVE ||
trec -> state == TREC_CONDEMNED);
@@ -1201,7 +1604,7 @@ StgClosure *stmReadTVar(Capability *cap,
result = current_value;
}
- TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
+ TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
return result;
}
@@ -1214,7 +1617,7 @@ void stmWriteTVar(Capability *cap,
StgTRecHeader *entry_in;
TRecEntry *entry = NULL;
- TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
+ TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
ASSERT (trec != NO_TREC);
ASSERT (trec -> state == TREC_ACTIVE ||
trec -> state == TREC_CONDEMNED);
@@ -1241,7 +1644,7 @@ void stmWriteTVar(Capability *cap,
new_entry -> new_value = new_value;
}
- TRACE("%p : stmWriteTVar done\n", trec);
+ TRACE("%p : stmWriteTVar done", trec);
}
/*......................................................................*/
@@ -1252,7 +1655,7 @@ StgTVar *stmNewTVar(Capability *cap,
result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
result -> current_value = new_value;
- result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
+ result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;
#if defined(THREADED_RTS)
result -> num_updates = 0;
#endif
diff --git a/rts/Sanity.c b/rts/Sanity.c
index 33ec988106..48d913c46e 100644
--- a/rts/Sanity.c
+++ b/rts/Sanity.c
@@ -447,19 +447,36 @@ checkClosure( StgClosure* p )
#endif
- case TVAR_WAIT_QUEUE:
+ case TVAR_WATCH_QUEUE:
{
- StgTVarWaitQueue *wq = (StgTVarWaitQueue *)p;
+ StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry));
ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry));
- return sizeofW(StgTVarWaitQueue);
+ return sizeofW(StgTVarWatchQueue);
+ }
+
+ case INVARIANT_CHECK_QUEUE:
+ {
+ StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p;
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry));
+ return sizeofW(StgInvariantCheckQueue);
+ }
+
+ case ATOMIC_INVARIANT:
+ {
+ StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution));
+ return sizeofW(StgAtomicInvariant);
}
case TVAR:
{
StgTVar *tv = (StgTVar *)p;
ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value));
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_wait_queue_entry));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry));
return sizeofW(StgTVar);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 585ddec0ef..0e54b65402 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -3039,8 +3039,9 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
- We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
- despite the similar implementation.
+ We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
+ create) because retries are not considered to be exceptions, despite the
+ similar implementation.
We should not expect to see CATCH_FRAME or STOP_FRAME because those should
not be created within memory transactions.
@@ -3060,7 +3061,7 @@ findRetryFrameHelper (StgTSO *tso)
case ATOMICALLY_FRAME:
debugTrace(DEBUG_stm,
- "found ATOMICALLY_FRAME at %p during retrry", p);
+ "found ATOMICALLY_FRAME at %p during retry", p);
tso->sp = p;
return ATOMICALLY_FRAME;
@@ -3070,7 +3071,20 @@ findRetryFrameHelper (StgTSO *tso)
tso->sp = p;
return CATCH_RETRY_FRAME;
- case CATCH_STM_FRAME:
+ case CATCH_STM_FRAME: {
+ debugTrace(DEBUG_stm,
+ "found CATCH_STM_FRAME at %p during retry", p);
+ StgTRecHeader *trec = tso -> trec;
+ StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
+ stmAbortTransaction(tso -> cap, trec);
+ stmFreeAbortedTRec(tso -> cap, trec);
+ tso -> trec = outer;
+ p = next;
+ continue;
+ }
+
+
default:
ASSERT(info->i.type != CATCH_FRAME);
ASSERT(info->i.type != STOP_FRAME);
diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm
index fca5bf4018..0323618ba0 100644
--- a/rts/StgMiscClosures.cmm
+++ b/rts/StgMiscClosures.cmm
@@ -520,8 +520,14 @@ INFO_TABLE(stg_EMPTY_MVAR,3,0,MVAR,"MVAR","MVAR")
INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR")
{ foreign "C" barf("TVAR object entered!"); }
-INFO_TABLE(stg_TVAR_WAIT_QUEUE, 0, 0, TVAR_WAIT_QUEUE, "TVAR_WAIT_QUEUE", "TVAR_WAIT_QUEUE")
-{ foreign "C" barf("TVAR_WAIT_QUEUE object entered!"); }
+INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
+{ foreign "C" barf("TVAR_WATCH_QUEUE object entered!"); }
+
+INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
+{ foreign "C" barf("ATOMIC_INVARIANT object entered!"); }
+
+INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
+{ foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!"); }
INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
{ foreign "C" barf("TREC_CHUNK object entered!"); }
@@ -529,8 +535,11 @@ INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER")
{ foreign "C" barf("TREC_HEADER object entered!"); }
-INFO_TABLE_CONSTR(stg_END_STM_WAIT_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WAIT_QUEUE","END_STM_WAIT_QUEUE")
-{ foreign "C" barf("END_STM_WAIT_QUEUE object entered!"); }
+INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE")
+{ foreign "C" barf("END_STM_WATCH_QUEUE object entered!"); }
+
+INFO_TABLE_CONSTR(stg_END_INVARIANT_CHECK_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_INVARIANT_CHECK_QUEUE","END_INVARIANT_CHECK_QUEUE")
+{ foreign "C" barf("END_INVARIANT_CHECK_QUEUE object entered!"); }
INFO_TABLE_CONSTR(stg_END_STM_CHUNK_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_CHUNK_LIST","END_STM_CHUNK_LIST")
{ foreign "C" barf("END_STM_CHUNK_LIST object entered!"); }
@@ -538,7 +547,9 @@ INFO_TABLE_CONSTR(stg_END_STM_CHUNK_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_CHUN
INFO_TABLE_CONSTR(stg_NO_TREC,0,0,0,CONSTR_NOCAF_STATIC,"NO_TREC","NO_TREC")
{ foreign "C" barf("NO_TREC object entered!"); }
-CLOSURE(stg_END_STM_WAIT_QUEUE_closure,stg_END_STM_WAIT_QUEUE);
+CLOSURE(stg_END_STM_WATCH_QUEUE_closure,stg_END_STM_WATCH_QUEUE);
+
+CLOSURE(stg_END_INVARIANT_CHECK_QUEUE_closure,stg_END_INVARIANT_CHECK_QUEUE);
CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST);