-
Notifications
You must be signed in to change notification settings - Fork 2
Description
First of all, thank you again for creating these packages!
I have a use case where I need to process a large number of tables in a windowed query (think tables saved by the hour and I need two-hour averages).
I discussed with @jpsamaroo in JCon that it would be best to process it by simply operating on the chunks underpinning the DTable, but I've been struggling to produce a working example (see MWE below).
I get an error about indexing: "ERROR: ArgumentError: invalid index: EagerThunk (running) of type Dagger.EagerThunk"
Or an error about Chunk: "ERROR: type Chunk has no field a"
I probably wrongly assumed that wrapping it in Dagger.@Spawn will delay execution until it's ready to be run (ie, that inner function will never see the "Chunk")
MWE:
using Dagger, DTables, DataFrames
# starting from a DF to make things simle
x = DataFrame(; a=1:10_000, b=1:10_000)
# create 10 partitions
d = DTable(x, 1000)
results = []
for i in 1:(length(d.chunks)-1)
tbl = Dagger.@spawn d.chunks[i]
tbl_next = Dagger.@spawn d.chunks[i+1]
# running on current+next partition and calculating average of the column :a
out = Dagger.@spawn sum(tbl.a .+ tbl_next.a) / 2
# in an ideal world, the return is an object not scalar (eg, DataFrame), hence the generic `push!`
Dagger.@spawn push!(results, out)
end
# note: this is imperfect as it doesn't process 10th partition, but that's okay. I wanted to keep the example as simple as possible
Thank you!
EDIT:
The bonus question would be how to run it out-of-core as presented by Julian? (with deserialize/serialize arguments)
I couldn't find that API on the main branch / in the docs.
I suspect it might not be released yet, right?