Skip to content

Commit

Permalink
store futexes in per-allocation data rather than globally
Browse files Browse the repository at this point in the history
  • Loading branch information
RalfJung committed Oct 14, 2024
1 parent dba6c8b commit 442a5bc
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 116 deletions.
194 changes: 116 additions & 78 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::ops::Not;
use std::rc::Rc;
use std::time::Duration;

use rustc_data_structures::fx::FxHashMap;
Expand Down Expand Up @@ -121,6 +123,15 @@ struct Futex {
clock: VClock,
}

#[derive(Default, Clone)]
pub struct FutexRef(Rc<RefCell<Futex>>);

impl VisitProvenance for FutexRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance
}
}

/// A thread waiting on a futex.
#[derive(Debug)]
struct FutexWaiter {
Expand All @@ -137,9 +148,6 @@ pub struct SynchronizationObjects {
rwlocks: IndexVec<RwLockId, RwLock>,
condvars: IndexVec<CondvarId, Condvar>,
pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,

/// Futex info for the futex at the given address.
futexes: FxHashMap<u64, Futex>,
}

// Private extension trait for local helper methods
Expand Down Expand Up @@ -184,7 +192,7 @@ impl SynchronizationObjects {
}

impl<'tcx> AllocExtra<'tcx> {
pub fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
fn get_sync<T: 'static>(&self, offset: Size) -> Option<&T> {
self.sync.get(&offset).and_then(|s| s.downcast_ref::<T>())
}
}
Expand All @@ -193,75 +201,104 @@ impl<'tcx> AllocExtra<'tcx> {
/// If `init` is set to this, we consider the primitive initialized.
pub const LAZY_INIT_COOKIE: u32 = 0xcafe_affe;

/// Helper for lazily initialized `alloc_extra.sync` data:
/// this forces an immediate init.
pub fn lazy_sync_init<'tcx, T: 'static + Copy>(
ecx: &mut MiriInterpCx<'tcx>,
primitive: &MPlaceTy<'tcx>,
init_offset: Size,
data: T,
) -> InterpResult<'tcx> {
let (alloc, offset, _) = ecx.ptr_get_alloc_id(primitive.ptr(), 0)?;
let (alloc_extra, _machine) = ecx.get_alloc_extra_mut(alloc)?;
alloc_extra.sync.insert(offset, Box::new(data));
// Mark this as "initialized".
let init_field = primitive.offset(init_offset, ecx.machine.layouts.u32, ecx)?;
ecx.write_scalar_atomic(
Scalar::from_u32(LAZY_INIT_COOKIE),
&init_field,
AtomicWriteOrd::Relaxed,
)?;
interp_ok(())
}

/// Helper for lazily initialized `alloc_extra.sync` data:
/// Checks if the primitive is initialized, and return its associated data if so.
/// Otherwise, calls `new_data` to initialize the primitive.
pub fn lazy_sync_get_data<'tcx, T: 'static + Copy>(
ecx: &mut MiriInterpCx<'tcx>,
primitive: &MPlaceTy<'tcx>,
init_offset: Size,
name: &str,
new_data: impl FnOnce(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, T>,
) -> InterpResult<'tcx, T> {
// Check if this is already initialized. Needs to be atomic because we can race with another
// thread initializing. Needs to be an RMW operation to ensure we read the *latest* value.
// So we just try to replace MUTEX_INIT_COOKIE with itself.
let init_cookie = Scalar::from_u32(LAZY_INIT_COOKIE);
let init_field = primitive.offset(init_offset, ecx.machine.layouts.u32, ecx)?;
let (_init, success) = ecx
.atomic_compare_exchange_scalar(
&init_field,
&ImmTy::from_scalar(init_cookie, ecx.machine.layouts.u32),
init_cookie,
AtomicRwOrd::Relaxed,
AtomicReadOrd::Relaxed,
/* can_fail_spuriously */ false,
)?
.to_scalar_pair();

if success.to_bool()? {
// If it is initialized, it must be found in the "sync primitive" table,
// or else it has been moved illegally.
let (alloc, offset, _) = ecx.ptr_get_alloc_id(primitive.ptr(), 0)?;
let alloc_extra = ecx.get_alloc_extra(alloc)?;
let data = alloc_extra
.get_sync::<T>(offset)
.ok_or_else(|| err_ub_format!("`{name}` can't be moved after first use"))?;
interp_ok(*data)
} else {
let data = new_data(ecx)?;
lazy_sync_init(ecx, primitive, init_offset, data)?;
interp_ok(data)
}
}

// Public interface to synchronization primitives. Please note that in most
// cases, the function calls are infallible and it is the client's (shim
// implementation's) responsibility to detect and deal with erroneous
// situations.
impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Helper for lazily initialized `alloc_extra.sync` data:
/// this forces an immediate init.
fn lazy_sync_init<T: 'static + Copy>(
&mut self,
primitive: &MPlaceTy<'tcx>,
init_offset: Size,
data: T,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();

let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
let (alloc_extra, _machine) = this.get_alloc_extra_mut(alloc)?;
alloc_extra.sync.insert(offset, Box::new(data));
// Mark this as "initialized".
let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
this.write_scalar_atomic(
Scalar::from_u32(LAZY_INIT_COOKIE),
&init_field,
AtomicWriteOrd::Relaxed,
)?;
interp_ok(())
}

/// Helper for lazily initialized `alloc_extra.sync` data:
/// Checks if the primitive is initialized, and return its associated data if so.
/// Otherwise, calls `new_data` to initialize the primitive.
fn lazy_sync_get_data<T: 'static + Copy>(
&mut self,
primitive: &MPlaceTy<'tcx>,
init_offset: Size,
name: &str,
new_data: impl FnOnce(&mut MiriInterpCx<'tcx>) -> InterpResult<'tcx, T>,
) -> InterpResult<'tcx, T> {
let this = self.eval_context_mut();

// Check if this is already initialized. Needs to be atomic because we can race with another
// thread initializing. Needs to be an RMW operation to ensure we read the *latest* value.
// So we just try to replace MUTEX_INIT_COOKIE with itself.
let init_cookie = Scalar::from_u32(LAZY_INIT_COOKIE);
let init_field = primitive.offset(init_offset, this.machine.layouts.u32, this)?;
let (_init, success) = this
.atomic_compare_exchange_scalar(
&init_field,
&ImmTy::from_scalar(init_cookie, this.machine.layouts.u32),
init_cookie,
AtomicRwOrd::Relaxed,
AtomicReadOrd::Relaxed,
/* can_fail_spuriously */ false,
)?
.to_scalar_pair();

if success.to_bool()? {
// If it is initialized, it must be found in the "sync primitive" table,
// or else it has been moved illegally.
let (alloc, offset, _) = this.ptr_get_alloc_id(primitive.ptr(), 0)?;
let alloc_extra = this.get_alloc_extra(alloc)?;
let data = alloc_extra
.get_sync::<T>(offset)
.ok_or_else(|| err_ub_format!("`{name}` can't be moved after first use"))?;
interp_ok(*data)
} else {
let data = new_data(this)?;
this.lazy_sync_init(primitive, init_offset, data)?;
interp_ok(data)
}
}

/// Get the synchronization primitive associated with the given pointer,
/// or initialize a new one.
fn get_sync_or_init<'a, T: 'static>(
&'a mut self,
ptr: Pointer,
new: impl FnOnce(&'a mut MiriMachine<'tcx>) -> InterpResult<'tcx, T>,
) -> InterpResult<'tcx, &'a T>
where
'tcx: 'a,
{
let this = self.eval_context_mut();
// Ensure there is memory behind this pointer, so that this allocation
// is truly the only place where the data could be stored.
this.check_ptr_access(ptr, Size::from_bytes(1), CheckInAllocMsg::InboundsTest)?;

let (alloc, offset, _) = this.ptr_get_alloc_id(ptr, 0)?;
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
// Due to borrow checker reasons, we have to do the lookup twice.
if alloc_extra.get_sync::<T>(offset).is_none() {
let new = new(machine)?;
alloc_extra.sync.insert(offset, Box::new(new));
}
interp_ok(alloc_extra.get_sync::<T>(offset).unwrap())
}

#[inline]
/// Get the id of the thread that currently owns this lock.
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
Expand Down Expand Up @@ -656,7 +693,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
fn futex_wait(
&mut self,
addr: u64,
futex_ref: FutexRef,
bitset: u32,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
retval_succ: Scalar,
Expand All @@ -666,23 +703,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
let mut futex = futex_ref.0.borrow_mut();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset });
drop(futex);

this.block_thread(
BlockReason::Futex { addr },
BlockReason::Futex,
timeout,
callback!(
@capture<'tcx> {
addr: u64,
futex_ref: FutexRef,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: Scalar,
}
@unblock = |this| {
let futex = this.machine.sync.futexes.get(&addr).unwrap();
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
Expand All @@ -694,7 +733,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
@timeout = |this| {
// Remove the waiter from the futex.
let thread = this.active_thread();
let futex = this.machine.sync.futexes.get_mut(&addr).unwrap();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
Expand All @@ -706,11 +745,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}

/// Returns whether anything was woken.
fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
return interp_ok(false);
};
let mut futex = futex_ref.0.borrow_mut();
let data_race = &this.machine.data_race;

// Each futex-wake happens-before the end of the futex wait
Expand All @@ -723,7 +760,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return interp_ok(false);
};
let waiter = futex.waiters.remove(i).unwrap();
this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
drop(futex);
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
interp_ok(true)
}
}
2 changes: 1 addition & 1 deletion src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub enum BlockReason {
/// Blocked on a reader-writer lock.
RwLock(RwLockId),
/// Blocked on a Futex variable.
Futex { addr: u64 },
Futex,
/// Blocked on an InitOnce.
InitOnce(InitOnceId),
/// Blocked on epoll.
Expand Down
17 changes: 14 additions & 3 deletions src/shims/unix/linux/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use crate::concurrency::sync::FutexRef;
use crate::*;

struct LinuxFutex {
futex: FutexRef,
}

/// Implementation of the SYS_futex syscall.
/// `args` is the arguments *after* the syscall number.
pub fn futex<'tcx>(
Expand Down Expand Up @@ -29,9 +34,15 @@ pub fn futex<'tcx>(
let op = this.read_scalar(op)?.to_i32()?;
let val = this.read_scalar(val)?.to_i32()?;

// Storing this inside the allocation means that if an address gets reused by a new allocation,
// we'll use an independent futex queue for this... that seems acceptable.
let futex_ref = this
.get_sync_or_init(addr, |_| interp_ok(LinuxFutex { futex: Default::default() }))?
.futex
.clone();

// This is a vararg function so we have to bring our own type for this pointer.
let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
let addr_usize = addr.ptr().addr().bytes();

let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG");
let futex_wait = this.eval_libc_i32("FUTEX_WAIT");
Expand Down Expand Up @@ -159,7 +170,7 @@ pub fn futex<'tcx>(
if val == futex_val {
// The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
this.futex_wait(
addr_usize,
futex_ref,
bitset,
timeout,
Scalar::from_target_isize(0, this), // retval_succ
Expand Down Expand Up @@ -207,7 +218,7 @@ pub fn futex<'tcx>(
let mut n = 0;
#[allow(clippy::arithmetic_side_effects)]
for _ in 0..val {
if this.futex_wake(addr_usize, bitset)? {
if this.futex_wake(&futex_ref, bitset)? {
n += 1;
} else {
break;
Expand Down
12 changes: 4 additions & 8 deletions src/shims/unix/macos/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@ trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
let lock = this.deref_pointer(lock_ptr)?;
// We store the mutex ID in the `sync` metadata. This means that when the lock is moved,
// that's just implicitly creating a new lock at the new location.
let (alloc, offset, _) = this.ptr_get_alloc_id(lock.ptr(), 0)?;
let (alloc_extra, machine) = this.get_alloc_extra_mut(alloc)?;
if let Some(data) = alloc_extra.get_sync::<MacOsUnfairLock>(offset) {
interp_ok(data.id)
} else {
let data = this.get_sync_or_init(lock.ptr(), |machine| {
let id = machine.sync.mutex_create();
alloc_extra.sync.insert(offset, Box::new(MacOsUnfairLock { id }));
interp_ok(id)
}
interp_ok(MacOsUnfairLock { id })
})?;
interp_ok(data.id)
}
}

Expand Down
Loading

0 comments on commit 442a5bc

Please sign in to comment.