Skip to content
Closed
66 changes: 45 additions & 21 deletions src/datadeps/aliasing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -263,28 +263,32 @@ function is_stored(cache::AliasedObjectCacheStore, space::MemorySpace, ainfo::Ab
key = cache.derived[ainfo]
return key in cache.stored[space]
end
function is_key_present(cache::AliasedObjectCacheStore, space::MemorySpace, ainfo::AbstractAliasing)
return haskey(cache.derived, ainfo)
end
function get_stored(cache::AliasedObjectCacheStore, space::MemorySpace, ainfo::AbstractAliasing)
@assert is_stored(cache, space, ainfo) "Cache does not have key $ainfo"
@assert is_stored(cache, space, ainfo) "Cache does not have derived ainfo $ainfo"
key = cache.derived[ainfo]
return cache.values[space][key]
end
function set_stored!(cache::AliasedObjectCacheStore, dest_space::MemorySpace, value::Chunk, ainfo::AbstractAliasing, orig_space::MemorySpace)
@assert !is_stored(cache, dest_space, ainfo) "Cache already has key $ainfo"
if !haskey(cache.derived, ainfo)
push!(cache.keys, ainfo)
cache.derived[ainfo] = ainfo
push!(get!(Set{AbstractAliasing}, cache.stored, orig_space), ainfo)
key = ainfo
else
key = cache.derived[ainfo]
end
function set_stored!(cache::AliasedObjectCacheStore, dest_space::MemorySpace, value::Chunk, ainfo::AbstractAliasing)
@assert !is_stored(cache, dest_space, ainfo) "Cache already has derived ainfo $ainfo"
key = cache.derived[ainfo]
value_ainfo = aliasing(value, identity)
cache.derived[value_ainfo] = key
push!(get!(Set{AbstractAliasing}, cache.stored, dest_space), key)
values_dict = get!(Dict{AbstractAliasing,Chunk}, cache.values, dest_space)
values_dict[key] = value
return
end
function set_key_stored!(cache::AliasedObjectCacheStore, space::MemorySpace, ainfo::AbstractAliasing, value::Chunk)
push!(cache.keys, ainfo)
cache.derived[ainfo] = ainfo
push!(get!(Set{AbstractAliasing}, cache.stored, space), ainfo)
values_dict = get!(Dict{AbstractAliasing,Chunk}, cache.values, space)
values_dict[ainfo] = value
return
end

struct AliasedObjectCache
space::MemorySpace
Expand All @@ -298,6 +302,14 @@ function is_stored(cache::AliasedObjectCache, ainfo::AbstractAliasing)
cache_raw = unwrap(cache.chunk)::AliasedObjectCacheStore
return is_stored(cache_raw, cache.space, ainfo)
end
function is_key_present(cache::AliasedObjectCache, space::MemorySpace, ainfo::AbstractAliasing)
wid = root_worker_id(cache.chunk)
if wid != myid()
return remotecall_fetch(is_key_present, wid, cache, space, ainfo)
end
cache_raw = unwrap(cache.chunk)::AliasedObjectCacheStore
return is_key_present(cache_raw, space, ainfo)
end
function get_stored(cache::AliasedObjectCache, ainfo::AbstractAliasing)
wid = root_worker_id(cache.chunk)
if wid != myid()
Expand All @@ -306,16 +318,32 @@ function get_stored(cache::AliasedObjectCache, ainfo::AbstractAliasing)
cache_raw = unwrap(cache.chunk)::AliasedObjectCacheStore
return get_stored(cache_raw, cache.space, ainfo)
end
function set_stored!(cache::AliasedObjectCache, value::Chunk, ainfo::AbstractAliasing, orig_space::MemorySpace)
function set_stored!(cache::AliasedObjectCache, value::Chunk, ainfo::AbstractAliasing)
wid = root_worker_id(cache.chunk)
if wid != myid()
return remotecall_fetch(set_stored!, wid, cache, value, ainfo, orig_space)
return remotecall_fetch(set_stored!, wid, cache, value, ainfo)
end
cache_raw = unwrap(cache.chunk)::AliasedObjectCacheStore
set_stored!(cache_raw, cache.space, value, ainfo, orig_space)
set_stored!(cache_raw, cache.space, value, ainfo)
return
end
function set_key_stored!(cache::AliasedObjectCache, space::MemorySpace, ainfo::AbstractAliasing, value::Chunk)
wid = root_worker_id(cache.chunk)
if wid != myid()
return remotecall_fetch(set_key_stored!, wid, cache, space, ainfo, value)
end
cache_raw = unwrap(cache.chunk)::AliasedObjectCacheStore
set_key_stored!(cache_raw, space, ainfo, value)
end
function aliased_object!(f, cache::AliasedObjectCache, x; ainfo=aliasing(x, identity))
x_space = memory_space(x)
if !is_key_present(cache, x_space, ainfo)
# Preserve the object's memory-space/processor pairing when inserting
# the source key. Using bare `tochunk(x)` defaults to OSProc, which can
# incorrectly wrap GPU-backed objects as CPU chunks.
x_chunk = x isa Chunk ? x : tochunk(x, first(processors(x_space)))
set_key_stored!(cache, x_space, ainfo, x_chunk)
end
if is_stored(cache, ainfo)
return get_stored(cache, ainfo)
else
Expand All @@ -325,7 +353,7 @@ function aliased_object!(f, cache::AliasedObjectCache, x; ainfo=aliasing(x, iden
if memory_space(x) != cache.space
@assert ainfo != aliasing(y, identity) "Aliasing mismatch! $ainfo == $(aliasing(y, identity))"
end
set_stored!(cache, y, ainfo, memory_space(x))
set_stored!(cache, y, ainfo)
return y
end
end
Expand Down Expand Up @@ -799,12 +827,8 @@ for wrapper in (UpperTriangular, LowerTriangular, UnitUpperTriangular, UnitLower
end
end
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::Base.RefValue)
to_w = root_worker_id(to_proc)
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, v[])
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk) do from_proc, to_proc, from_space, to_space, p_chunk
p_new = move(from_proc, to_proc, p_chunk)
v_new = Ref(p_new)
return tochunk(v_new, to_proc)
return aliased_object!(cache, v) do v
return remotecall_endpoint(identity, from_proc, to_proc, from_space, to_space, v)
end
end
#= FIXME: Make this work so we can automatically move-rewrap recursive objects
Expand Down
11 changes: 10 additions & 1 deletion src/datadeps/chunkview.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ struct ChunkView{N}
slices::NTuple{N, Union{Int, AbstractRange{Int}, Colon}}
end

function _identity_hash(arg::ChunkView, h::UInt=UInt(0))
return hash(arg.slices, _identity_hash(arg.chunk, h))
end

function Base.view(c::Chunk, slices...)
if c.domain isa ArrayDomain
nd, sz = ndims(c.domain), size(c.domain)
Expand Down Expand Up @@ -39,7 +43,12 @@ isremotehandle(x::ChunkView) = true

function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, slice::ChunkView)
to_w = root_worker_id(to_proc)
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, slice.chunk)
# N.B. We use move_rewrap (not rewrap_aliased_object!) so that if the inner
# chunk is a SubArray, it goes through the SubArray-aware path which shares
# the parent array via the aliased object cache. Using rewrap_aliased_object!
# would simply serialize the entire SubArray, creating a new parent copy on
# the destination, breaking aliasing with other views of the same parent.
p_chunk = move_rewrap(cache, from_proc, to_proc, from_space, to_space, slice.chunk)
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk, slice.slices) do from_proc, to_proc, from_space, to_space, p_chunk, inds
p_new = move(from_proc, to_proc, p_chunk)
v_new = view(p_new, inds...)
Expand Down
50 changes: 36 additions & 14 deletions src/datadeps/remainders.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ This is used to perform partial data copies that only update the "remainder" reg
struct RemainderAliasing{S<:MemorySpace} <: AbstractAliasing
space::S
spans::Vector{Tuple{LocalMemorySpan,LocalMemorySpan}}
ainfos::Vector{AliasingWrapper}
syncdeps::Set{ThunkSyncdep}
end
RemainderAliasing(space::S, spans::Vector{Tuple{LocalMemorySpan,LocalMemorySpan}}, syncdeps::Set{ThunkSyncdep}) where S =
RemainderAliasing{S}(space, spans, syncdeps)
RemainderAliasing(space::S, spans::Vector{Tuple{LocalMemorySpan,LocalMemorySpan}}, ainfos::Vector{AliasingWrapper}, syncdeps::Set{ThunkSyncdep}) where S =
RemainderAliasing{S}(space, spans, ainfos, syncdeps)

memory_spans(ra::RemainderAliasing) = ra.spans

Expand Down Expand Up @@ -136,7 +137,7 @@ function compute_remainder_for_arg!(state::DataDepsState,
end

# Create our tracker
tracker = Dict{MemorySpace,Tuple{Vector{Tuple{LocalMemorySpan,LocalMemorySpan}},Set{ThunkSyncdep}}}()
tracker = Dict{MemorySpace,Tuple{Vector{Tuple{LocalMemorySpan,LocalMemorySpan}},Vector{AliasingWrapper},Set{ThunkSyncdep}}}()

# Walk backwards through the history of writes to this target
# other_ainfo is the overlapping ainfo that was written to
Expand Down Expand Up @@ -187,13 +188,14 @@ function compute_remainder_for_arg!(state::DataDepsState,
other_space_idx = something(findfirst(==(other_space), spaces))
target_space_idx = something(findfirst(==(target_space), spaces))
tracker_other_space = get!(tracker, other_space) do
(Vector{Tuple{LocalMemorySpan,LocalMemorySpan}}(), Set{ThunkSyncdep}())
(Vector{Tuple{LocalMemorySpan,LocalMemorySpan}}(), Vector{AliasingWrapper}(), Set{ThunkSyncdep}())
end
@opcounter :compute_remainder_for_arg_schedule
has_overlap = schedule_remainder!(tracker_other_space[1], other_space_idx, target_space_idx, remainder, other_many_spans)
if compute_syncdeps && has_overlap
@assert haskey(state.ainfos_owner, other_ainfo) "[idx $idx] ainfo $(typeof(other_ainfo)) has no owner"
get_read_deps!(state, other_space, other_ainfo, write_num, tracker_other_space[2])
get_read_deps!(state, other_space, other_ainfo, write_num, tracker_other_space[3])
push!(tracker_other_space[2], other_ainfo)
end
end
VERIFY_SPAN_CURRENT_OBJECT[] = nothing
Expand All @@ -206,9 +208,9 @@ function compute_remainder_for_arg!(state::DataDepsState,
mra = MultiRemainderAliasing()
for space in spaces
if haskey(tracker, space)
spans, syncdeps = tracker[space]
spans, ainfos, syncdeps = tracker[space]
if !isempty(spans)
push!(mra.remainders, RemainderAliasing(space, spans, syncdeps))
push!(mra.remainders, RemainderAliasing(space, spans, ainfos, syncdeps))
end
end
end
Expand Down Expand Up @@ -274,6 +276,8 @@ function enqueue_remainder_copy_to!(state::DataDepsState, dest_space::MemorySpac
push!(remainder_syncdeps, syncdep)
end
empty!(remainder_aliasing.syncdeps) # We can't bring these to move!
source_ainfos = copy(remainder_aliasing.ainfos)
empty!(remainder_aliasing.ainfos)
get_write_deps!(state, dest_space, target_ainfo, write_num, remainder_syncdeps)

@dagdebug task.uid :spawn_datadeps "($(repr(f)))[$(idx-1)][$dep_mod] Remainder copy-to has $(length(remainder_syncdeps)) syncdeps"
Expand All @@ -285,7 +289,10 @@ function enqueue_remainder_copy_to!(state::DataDepsState, dest_space::MemorySpac
copy_task = Dagger.@spawn scope=dest_scope exec_scope=dest_scope syncdeps=remainder_syncdeps meta=true Dagger.move!(remainder_aliasing, dest_space, source_space, arg_dest, arg_source)
@maybelog ctx timespan_finish(ctx, :datadeps_copy, (;id), (;thunk_id=copy_task.uid, from_space=source_space, to_space=dest_space, arg_w, from_arg=arg_source, to_arg=arg_dest))

# This copy task becomes a new writer for the target region
# This copy task reads the sources and writes to the target
for ainfo in source_ainfos
add_reader!(state, arg_w, source_space, ainfo, copy_task, write_num)
end
add_writer!(state, arg_w, dest_space, target_ainfo, copy_task, write_num)
end
"""
Expand Down Expand Up @@ -323,6 +330,8 @@ function enqueue_remainder_copy_from!(state::DataDepsState, dest_space::MemorySp
push!(remainder_syncdeps, syncdep)
end
empty!(remainder_aliasing.syncdeps) # We can't bring these to move!
source_ainfos = copy(remainder_aliasing.ainfos)
empty!(remainder_aliasing.ainfos)
get_write_deps!(state, dest_space, target_ainfo, write_num, remainder_syncdeps)

@dagdebug nothing :spawn_datadeps "($(typeof(arg_w.arg)))[$dep_mod] Remainder copy-from has $(length(remainder_syncdeps)) syncdeps"
Expand All @@ -334,7 +343,10 @@ function enqueue_remainder_copy_from!(state::DataDepsState, dest_space::MemorySp
copy_task = Dagger.@spawn scope=dest_scope exec_scope=dest_scope syncdeps=remainder_syncdeps meta=true Dagger.move!(remainder_aliasing, dest_space, source_space, arg_dest, arg_source)
@maybelog ctx timespan_finish(ctx, :datadeps_copy, (;id), (;thunk_id=copy_task.uid, from_space=source_space, to_space=dest_space, arg_w, from_arg=arg_source, to_arg=arg_dest))

# This copy task becomes a new writer for the target region
# This copy task reads the sources and writes to the target
for ainfo in source_ainfos
add_reader!(state, arg_w, source_space, ainfo, copy_task, write_num)
end
add_writer!(state, arg_w, dest_space, target_ainfo, copy_task, write_num)
end

Expand Down Expand Up @@ -367,7 +379,8 @@ function enqueue_copy_to!(state::DataDepsState, dest_space::MemorySpace, arg_w::
copy_task = Dagger.@spawn scope=dest_scope exec_scope=dest_scope syncdeps=copy_syncdeps meta=true Dagger.move!(dep_mod, dest_space, source_space, arg_dest, arg_source)
@maybelog ctx timespan_finish(ctx, :datadeps_copy, (;id), (;thunk_id=copy_task.uid, from_space=source_space, to_space=dest_space, arg_w, from_arg=arg_source, to_arg=arg_dest))

# This copy task becomes a new writer for the target region
# This copy task reads the source and writes to the target
add_reader!(state, arg_w, source_space, source_ainfo, copy_task, write_num)
add_writer!(state, arg_w, dest_space, target_ainfo, copy_task, write_num)
end
function enqueue_copy_from!(state::DataDepsState, dest_space::MemorySpace, arg_w::ArgumentWrapper,
Expand Down Expand Up @@ -398,7 +411,8 @@ function enqueue_copy_from!(state::DataDepsState, dest_space::MemorySpace, arg_w
copy_task = Dagger.@spawn scope=dest_scope exec_scope=dest_scope syncdeps=copy_syncdeps meta=true Dagger.move!(dep_mod, dest_space, source_space, arg_dest, arg_source)
@maybelog ctx timespan_finish(ctx, :datadeps_copy, (;id), (;thunk_id=copy_task.uid, from_space=source_space, to_space=dest_space, arg_w, from_arg=arg_source, to_arg=arg_dest))

# This copy task becomes a new writer for the target region
# This copy task reads the source and writes to the target
add_reader!(state, arg_w, source_space, source_ainfo, copy_task, write_num)
add_writer!(state, arg_w, dest_space, target_ainfo, copy_task, write_num)
end

Expand Down Expand Up @@ -495,12 +509,20 @@ for wrapper in (UpperTriangular, LowerTriangular, UnitUpperTriangular, UnitLower
write_remainder!(copies, copies_offset, parent(to), to_ptr, n)
end
end
# N.B. We don't handle pointer aliasing in remainder copies

function read_remainder!(copies::Vector{UInt8}, copies_offset::UInt64, from::Base.RefValue, from_ptr::UInt64, n::UInt64)
read_remainder!(copies, copies_offset, from[], from_ptr, n)
if from_ptr == UInt64(Base.pointer_from_objref(from) + fieldoffset(typeof(from), 1))
unsafe_copyto!(pointer(copies, copies_offset), Ptr{UInt8}(from_ptr), n)
else
read_remainder!(copies, copies_offset, from[], from_ptr, n)
end
end
function write_remainder!(copies::Vector{UInt8}, copies_offset::UInt64, to::Base.RefValue, to_ptr::UInt64, n::UInt64)
write_remainder!(copies, copies_offset, to[], to_ptr, n)
if to_ptr == UInt64(Base.pointer_from_objref(to) + fieldoffset(typeof(to), 1))
unsafe_copyto!(Ptr{UInt8}(to_ptr), pointer(copies, copies_offset), n)
else
write_remainder!(copies, copies_offset, to[], to_ptr, n)
end
end

function find_object_holding_ptr(A::SparseMatrixCSC, ptr::UInt64)
Expand Down
24 changes: 18 additions & 6 deletions src/memory-spaces.jl
Original file line number Diff line number Diff line change
Expand Up @@ -336,19 +336,20 @@ Base.:(==)(ca1::CombinedAliasing, ca2::CombinedAliasing) =
Base.hash(ca1::CombinedAliasing, h::UInt) =
hash(ca1.sub_ainfos, hash(CombinedAliasing, h))

struct ObjectAliasing <: AbstractAliasing
ptr::Ptr{Cvoid}
struct ObjectAliasing{S<:MemorySpace} <: AbstractAliasing
ptr::RemotePtr{Cvoid,S}
sz::UInt
end
ObjectAliasing(ptr::RemotePtr{Cvoid,S}, sz::Integer) where {S<:MemorySpace} =
ObjectAliasing{S}(ptr, UInt(sz))
function ObjectAliasing(x::T) where T
@nospecialize x
ptr = pointer_from_objref(x)
ptr = RemotePtr{Cvoid}(pointer_from_objref(x))
sz = sizeof(T)
return ObjectAliasing(ptr, sz)
end
function memory_spans(oa::ObjectAliasing)
rptr = RemotePtr{Cvoid}(oa.ptr)
span = MemorySpan{CPURAMMemorySpace}(rptr, oa.sz)
function memory_spans(oa::ObjectAliasing{S}) where S
span = MemorySpan{S}(oa.ptr, oa.sz)
return [span]
end

Expand Down Expand Up @@ -402,6 +403,17 @@ end
aliasing(x::DTask, T) = aliasing(fetch(x; raw=true), T)
aliasing(x::DTask) = aliasing(fetch(x; raw=true))

function aliasing(x::Base.RefValue{T}) where T
addr = UInt(Base.pointer_from_objref(x) + fieldoffset(typeof(x), 1))
ptr = RemotePtr{Cvoid}(addr, CPURAMMemorySpace(myid()))
ainfo = ObjectAliasing(ptr, sizeof(x))
if isassigned(x) && type_may_alias(T) && type_may_alias(typeof(x[]))
return CombinedAliasing([ainfo, aliasing(x[])])
else
return CombinedAliasing([ainfo])
end
end

struct ContiguousAliasing{S} <: AbstractAliasing
span::MemorySpan{S}
end
Expand Down
8 changes: 4 additions & 4 deletions src/utils/haloarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ end
memory_space(A::HaloArray) = memory_space(A.center)

function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, A::HaloArray)
center_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, A.center)
halo_chunks = ntuple(i -> rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, A.halos[i]), length(A.halos))
center_chunk = move_rewrap(cache, from_proc, to_proc, from_space, to_space, A.center)
halo_chunks = ntuple(i -> move_rewrap(cache, from_proc, to_proc, from_space, to_space, A.halos[i]), length(A.halos))
halo_width = A.halo_width
to_w = root_worker_id(to_proc)
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width) do from_proc, to_proc, from_space, to_space, center_chunk, halo_chunks, halo_width
center_new = move(from_proc, to_proc, center_chunk)
halos_new = ntuple(i -> move(from_proc, to_proc, halo_chunks[i]), length(halo_chunks))
center_new = unwrap(center_chunk)
halos_new = ntuple(i -> unwrap(halo_chunks[i]), length(halo_chunks))
return tochunk(HaloArray(center_new, halos_new, halo_width), to_proc)
end
end
Expand Down
Loading
Loading