diff options
Diffstat (limited to 'compiler/rustc_data_structures/src/sync.rs')
-rw-r--r-- | compiler/rustc_data_structures/src/sync.rs | 238 |
1 files changed, 203 insertions, 35 deletions
diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index e73ca56efa0..8a778866a77 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -39,6 +39,7 @@ //! //! [^2] `MTLockRef` is a typedef. +pub use crate::marker::*; use crate::owned_slice::OwnedSlice; use std::collections::HashMap; use std::hash::{BuildHasher, Hash}; @@ -55,6 +56,42 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec}; mod vec; +mod mode { + use super::Ordering; + use std::sync::atomic::AtomicU8; + + const UNINITIALIZED: u8 = 0; + const DYN_NOT_THREAD_SAFE: u8 = 1; + const DYN_THREAD_SAFE: u8 = 2; + + static DYN_THREAD_SAFE_MODE: AtomicU8 = AtomicU8::new(UNINITIALIZED); + + // Whether thread safety is enabled (due to running under multiple threads). + #[inline] + pub fn is_dyn_thread_safe() -> bool { + match DYN_THREAD_SAFE_MODE.load(Ordering::Relaxed) { + DYN_NOT_THREAD_SAFE => false, + DYN_THREAD_SAFE => true, + _ => panic!("uninitialized dyn_thread_safe mode!"), + } + } + + // Only set by the `-Z threads` compile option + pub fn set_dyn_thread_safe_mode(mode: bool) { + let set: u8 = if mode { DYN_THREAD_SAFE } else { DYN_NOT_THREAD_SAFE }; + let previous = DYN_THREAD_SAFE_MODE.compare_exchange( + UNINITIALIZED, + set, + Ordering::Relaxed, + Ordering::Relaxed, + ); + + // Check that the mode was either uninitialized or was already set to the requested mode. + assert!(previous.is_ok() || previous == Err(set)); + } +} + +pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; cfg_if! { if #[cfg(not(parallel_compiler))] { pub unsafe auto trait Send {} @@ -149,7 +186,7 @@ cfg_if! { #[macro_export] macro_rules! parallel { - ($($blocks:tt),*) => { + ($($blocks:block),*) => { // We catch panics here ensuring that all the blocks execute. // This makes behavior consistent with the parallel compiler. let mut panic = None; @@ -168,12 +205,6 @@ cfg_if! { } } - pub use Iterator as ParallelIterator; - - pub fn par_iter<T: IntoIterator>(t: T) -> T::IntoIter { - t.into_iter() - } - pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) { // We catch panics here ensuring that all the loop iterations execute. // This makes behavior consistent with the parallel compiler. @@ -190,6 +221,29 @@ cfg_if! { } } + pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>( + t: T, + mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R, + ) -> C { + // We catch panics here ensuring that all the loop iterations execute. + let mut panic = None; + let r = t.into_iter().filter_map(|i| { + match catch_unwind(AssertUnwindSafe(|| map(i))) { + Ok(r) => Some(r), + Err(p) => { + if panic.is_none() { + panic = Some(p); + } + None + } + } + }).collect(); + if let Some(panic) = panic { + resume_unwind(panic); + } + r + } + pub type MetadataRef = OwnedSlice; pub use std::rc::Rc as Lrc; @@ -302,46 +356,165 @@ cfg_if! { use parking_lot::RwLock as InnerRwLock; use std::thread; - pub use rayon::{join, scope}; + + #[inline] + pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB) + where + A: FnOnce() -> RA + DynSend, + B: FnOnce() -> RB + DynSend, + { + if mode::is_dyn_thread_safe() { + let oper_a = FromDyn::from(oper_a); + let oper_b = FromDyn::from(oper_b); + let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()())); + (a.into_inner(), b.into_inner()) + } else { + (oper_a(), oper_b()) + } + } + + // This function only works when `mode::is_dyn_thread_safe()`. + pub fn scope<'scope, OP, R>(op: OP) -> R + where + OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend, + R: DynSend, + { + let op = FromDyn::from(op); + rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner() + } /// Runs a list of blocks in parallel. The first block is executed immediately on /// the current thread. Use that for the longest running block. #[macro_export] macro_rules! parallel { - (impl $fblock:tt [$($c:tt,)*] [$block:tt $(, $rest:tt)*]) => { + (impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => { parallel!(impl $fblock [$block, $($c,)*] [$($rest),*]) }; - (impl $fblock:tt [$($blocks:tt,)*] []) => { + (impl $fblock:block [$($blocks:expr,)*] []) => { ::rustc_data_structures::sync::scope(|s| { + $(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks); + s.spawn(move |_| block.into_inner()());)* + (|| $fblock)(); + }); + }; + ($fblock:block, $($blocks:block),*) => { + if rustc_data_structures::sync::is_dyn_thread_safe() { + // Reverse the order of the later blocks since Rayon executes them in reverse order + // when using a single thread. This ensures the execution order matches that + // of a single threaded rustc. + parallel!(impl $fblock [] [$($blocks),*]); + } else { + // We catch panics here ensuring that all the blocks execute. + // This makes behavior consistent with the parallel compiler. + let mut panic = None; + if let Err(p) = ::std::panic::catch_unwind( + ::std::panic::AssertUnwindSafe(|| $fblock) + ) { + if panic.is_none() { + panic = Some(p); + } + } $( - s.spawn(|_| $blocks); + if let Err(p) = ::std::panic::catch_unwind( + ::std::panic::AssertUnwindSafe(|| $blocks) + ) { + if panic.is_none() { + panic = Some(p); + } + } )* - $fblock; - }) - }; - ($fblock:tt, $($blocks:tt),*) => { - // Reverse the order of the later blocks since Rayon executes them in reverse order - // when using a single thread. This ensures the execution order matches that - // of a single threaded rustc - parallel!(impl $fblock [] [$($blocks),*]); + if let Some(panic) = panic { + ::std::panic::resume_unwind(panic); + } + } }; } - pub use rayon::iter::ParallelIterator; - use rayon::iter::IntoParallelIterator; + use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator}; + + pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>( + t: T, + for_each: impl Fn(I) + DynSync + DynSend + ) { + if mode::is_dyn_thread_safe() { + let for_each = FromDyn::from(for_each); + let panic: Lock<Option<_>> = Lock::new(None); + t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { + let mut l = panic.lock(); + if l.is_none() { + *l = Some(p) + } + }); - pub fn par_iter<T: IntoParallelIterator>(t: T) -> T::Iter { - t.into_par_iter() + if let Some(panic) = panic.into_inner() { + resume_unwind(panic); + } + } else { + // We catch panics here ensuring that all the loop iterations execute. + // This makes behavior consistent with the parallel compiler. + let mut panic = None; + t.into_iter().for_each(|i| { + if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) { + if panic.is_none() { + panic = Some(p); + } + } + }); + if let Some(panic) = panic { + resume_unwind(panic); + } + } } - pub fn par_for_each_in<T: IntoParallelIterator>( + pub fn par_map< + I, + T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>, + R: std::marker::Send, + C: FromIterator<R> + FromParallelIterator<R> + >( t: T, - for_each: impl Fn(T::Item) + Sync + Send, - ) { - let ps: Vec<_> = t.into_par_iter().map(|i| catch_unwind(AssertUnwindSafe(|| for_each(i)))).collect(); - ps.into_iter().for_each(|p| if let Err(panic) = p { - resume_unwind(panic) - }); + map: impl Fn(I) -> R + DynSync + DynSend + ) -> C { + if mode::is_dyn_thread_safe() { + let panic: Lock<Option<_>> = Lock::new(None); + let map = FromDyn::from(map); + // We catch panics here ensuring that all the loop iterations execute. + let r = t.into_par_iter().filter_map(|i| { + match catch_unwind(AssertUnwindSafe(|| map(i))) { + Ok(r) => Some(r), + Err(p) => { + let mut l = panic.lock(); + if l.is_none() { + *l = Some(p); + } + None + }, + } + }).collect(); + + if let Some(panic) = panic.into_inner() { + resume_unwind(panic); + } + r + } else { + // We catch panics here ensuring that all the loop iterations execute. + let mut panic = None; + let r = t.into_iter().filter_map(|i| { + match catch_unwind(AssertUnwindSafe(|| map(i))) { + Ok(r) => Some(r), + Err(p) => { + if panic.is_none() { + panic = Some(p); + } + None + } + } + }).collect(); + if let Some(panic) = panic { + resume_unwind(panic); + } + r + } } pub type MetadataRef = OwnedSlice; @@ -352,11 +525,6 @@ cfg_if! { } } -pub fn assert_sync<T: ?Sized + Sync>() {} -pub fn assert_send<T: ?Sized + Send>() {} -pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {} -pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {} - #[derive(Default)] #[cfg_attr(parallel_compiler, repr(align(64)))] pub struct CacheAligned<T>(pub T); |