Skip to content

Commit c8de505

Browse files
feat: Implemented streaming optimizations for concurrent and large dag handling in applications
1 parent 96a03d4 commit c8de505

File tree

3 files changed

+3042
-60
lines changed

3 files changed

+3042
-60
lines changed

dag/dag.go

Lines changed: 139 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dag
22

33
import (
44
"fmt"
5+
"io"
56
"io/fs"
67
"os"
78
"path/filepath"
@@ -300,38 +301,59 @@ func processFile(entry fs.DirEntry, fullPath string, path *string, dag *DagBuild
300301
builder := CreateDagLeafBuilder(relPath)
301302
builder.SetType(FileLeafType)
302303

303-
fileData, err := os.ReadFile(fullPath)
304-
if err != nil {
305-
return nil, err
306-
}
304+
// Use streaming I/O to avoid loading entire file into memory
305+
// We process chunks incrementally, only keeping one chunk in memory at a time
306+
var singleChunk []byte
307+
chunkCount := 0
307308

308-
builder.SetType(FileLeafType)
309+
streamErr := streamFileChunks(fullPath, ChunkSize, func(chunk []byte, index int) error {
310+
chunkCount++
311+
if chunkCount == 1 {
312+
// Store first chunk - might be the only one
313+
singleChunk = chunk
314+
return nil
315+
}
309316

310-
if ChunkSize <= 0 {
311-
builder.SetData(fileData)
312-
} else {
313-
fileChunks := chunkFile(fileData, ChunkSize)
317+
// We have multiple chunks - need to process them as child leaves
318+
if chunkCount == 2 && singleChunk != nil {
319+
// Process the first chunk we stored earlier
320+
chunkEntryPath := filepath.Join(relPath, "0")
321+
chunkBuilder := CreateDagLeafBuilder(chunkEntryPath)
322+
chunkBuilder.SetType(ChunkLeafType)
323+
chunkBuilder.SetData(singleChunk)
314324

315-
if len(fileChunks) == 1 {
316-
builder.SetData(fileChunks[0])
317-
} else {
318-
for i, chunk := range fileChunks {
319-
// Use path-based naming for chunks to maintain compatibility
320-
chunkEntryPath := filepath.Join(relPath, strconv.Itoa(i))
321-
chunkBuilder := CreateDagLeafBuilder(chunkEntryPath)
325+
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
326+
if err != nil {
327+
return err
328+
}
322329

323-
chunkBuilder.SetType(ChunkLeafType)
324-
chunkBuilder.SetData(chunk)
330+
builder.AddLink(chunkLeaf.Hash)
331+
dag.AddLeaf(chunkLeaf, nil)
332+
singleChunk = nil // Release memory
333+
}
325334

326-
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
327-
if err != nil {
328-
return nil, err
329-
}
335+
// Process current chunk
336+
chunkEntryPath := filepath.Join(relPath, strconv.Itoa(index))
337+
chunkBuilder := CreateDagLeafBuilder(chunkEntryPath)
338+
chunkBuilder.SetType(ChunkLeafType)
339+
chunkBuilder.SetData(chunk)
330340

331-
builder.AddLink(chunkLeaf.Hash)
332-
dag.AddLeaf(chunkLeaf, nil)
333-
}
341+
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
342+
if err != nil {
343+
return err
334344
}
345+
346+
builder.AddLink(chunkLeaf.Hash)
347+
dag.AddLeaf(chunkLeaf, nil)
348+
return nil
349+
})
350+
if streamErr != nil {
351+
return nil, streamErr
352+
}
353+
354+
// If only one chunk, set it as data directly
355+
if chunkCount == 1 && singleChunk != nil {
356+
builder.SetData(singleChunk)
335357
}
336358

337359
result, err = builder.BuildLeaf(additionalData)
@@ -342,23 +364,57 @@ func processFile(entry fs.DirEntry, fullPath string, path *string, dag *DagBuild
342364
return result, nil
343365
}
344366

345-
func chunkFile(fileData []byte, chunkSize int) [][]byte {
346-
var chunks [][]byte
347-
fileSize := len(fileData)
367+
// streamFileChunks reads a file in chunks using streaming I/O, calling the callback for each chunk.
368+
// This is more memory efficient than reading the entire file into memory first.
369+
// If chunkSize <= 0, reads the entire file as a single chunk.
370+
func streamFileChunks(fullPath string, chunkSize int, callback func(chunk []byte, index int) error) error {
371+
file, err := os.Open(fullPath)
372+
if err != nil {
373+
return err
374+
}
375+
defer file.Close()
348376

377+
// If no chunk size specified, read entire file (fallback behavior)
349378
if chunkSize <= 0 {
350-
return [][]byte{fileData}
379+
data, err := io.ReadAll(file)
380+
if err != nil {
381+
return err
382+
}
383+
return callback(data, 0)
351384
}
352385

353-
for i := 0; i < fileSize; i += chunkSize {
354-
end := i + chunkSize
355-
if end > fileSize {
356-
end = fileSize
386+
// Stream file in chunks
387+
buffer := make([]byte, chunkSize)
388+
index := 0
389+
390+
for {
391+
n, err := io.ReadFull(file, buffer)
392+
if err == io.EOF {
393+
break
357394
}
358-
chunks = append(chunks, fileData[i:end])
395+
if err == io.ErrUnexpectedEOF {
396+
// Partial read - this is the last chunk
397+
chunk := make([]byte, n)
398+
copy(chunk, buffer[:n])
399+
if err := callback(chunk, index); err != nil {
400+
return err
401+
}
402+
break
403+
}
404+
if err != nil {
405+
return err
406+
}
407+
408+
// Full chunk read - make a copy since we reuse the buffer
409+
chunk := make([]byte, n)
410+
copy(chunk, buffer[:n])
411+
if err := callback(chunk, index); err != nil {
412+
return err
413+
}
414+
index++
359415
}
360416

361-
return chunks
417+
return nil
362418
}
363419

364420
// processEntryResult holds the result of processing a single entry in parallel
@@ -546,36 +602,59 @@ func processFileParallel(entry fs.DirEntry, fullPath string, path *string, dag *
546602
builder := CreateDagLeafBuilder(relPath)
547603
builder.SetType(FileLeafType)
548604

549-
fileData, err := os.ReadFile(fullPath)
550-
if err != nil {
551-
return nil, err
552-
}
605+
// Use streaming I/O to avoid loading entire file into memory
606+
// We process chunks incrementally, only keeping one chunk in memory at a time
607+
var singleChunk []byte
608+
chunkCount := 0
553609

554-
if ChunkSize <= 0 {
555-
builder.SetData(fileData)
556-
} else {
557-
fileChunks := chunkFile(fileData, ChunkSize)
610+
streamErr := streamFileChunks(fullPath, ChunkSize, func(chunk []byte, index int) error {
611+
chunkCount++
612+
if chunkCount == 1 {
613+
// Store first chunk - might be the only one
614+
singleChunk = chunk
615+
return nil
616+
}
558617

559-
if len(fileChunks) == 1 {
560-
builder.SetData(fileChunks[0])
561-
} else {
562-
for i, chunk := range fileChunks {
563-
// Use simple numeric string as ItemName for chunks
564-
// This makes alphabetical sorting work naturally: "0", "1", "2", ...
565-
chunkItemName := strconv.Itoa(i)
566-
chunkBuilder := CreateDagLeafBuilder(chunkItemName)
567-
chunkBuilder.SetType(ChunkLeafType)
568-
chunkBuilder.SetData(chunk)
569-
570-
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
571-
if err != nil {
572-
return nil, err
573-
}
618+
// We have multiple chunks - need to process them as child leaves
619+
if chunkCount == 2 && singleChunk != nil {
620+
// Process the first chunk we stored earlier
621+
// Use simple numeric string as ItemName for chunks
622+
chunkBuilder := CreateDagLeafBuilder("0")
623+
chunkBuilder.SetType(ChunkLeafType)
624+
chunkBuilder.SetData(singleChunk)
574625

575-
builder.AddLink(chunkLeaf.Hash)
576-
dag.AddLeafSafe(chunkLeaf, nil)
626+
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
627+
if err != nil {
628+
return err
577629
}
630+
631+
builder.AddLink(chunkLeaf.Hash)
632+
dag.AddLeafSafe(chunkLeaf, nil)
633+
singleChunk = nil // Release memory
578634
}
635+
636+
// Process current chunk
637+
chunkItemName := strconv.Itoa(index)
638+
chunkBuilder := CreateDagLeafBuilder(chunkItemName)
639+
chunkBuilder.SetType(ChunkLeafType)
640+
chunkBuilder.SetData(chunk)
641+
642+
chunkLeaf, err := chunkBuilder.BuildLeaf(nil)
643+
if err != nil {
644+
return err
645+
}
646+
647+
builder.AddLink(chunkLeaf.Hash)
648+
dag.AddLeafSafe(chunkLeaf, nil)
649+
return nil
650+
})
651+
if streamErr != nil {
652+
return nil, streamErr
653+
}
654+
655+
// If only one chunk, set it as data directly
656+
if chunkCount == 1 && singleChunk != nil {
657+
builder.SetData(singleChunk)
579658
}
580659

581660
result, err = builder.BuildLeaf(additionalData)

0 commit comments

Comments
 (0)